http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/mobile/src/assemble/appPackage.xml b/examples/mobile/src/assemble/appPackage.xml new file mode 100644 index 0000000..4138cf2 --- /dev/null +++ b/examples/mobile/src/assemble/appPackage.xml @@ -0,0 +1,59 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + </fileSets> + +</assembly> +
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java ---------------------------------------------------------------------- diff --git a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java new file mode 100644 index 0000000..f719643 --- /dev/null +++ b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/Application.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mobile; + +import java.net.URI; +import java.util.Arrays; +import java.util.Map; +import java.util.Random; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang.mutable.MutableLong; +import org.apache.commons.lang3.Range; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StatsListener; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.counters.BasicCounters; +import com.datatorrent.lib.io.PubSubWebSocketInputOperator; +import com.datatorrent.lib.io.PubSubWebSocketOutputOperator; +import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner; +import com.datatorrent.lib.testbench.RandomEventGenerator; + +/** + * Mobile Example Application: + * <p> + * This example simulates large number of cell phones in the range of 40K to 200K + * and tracks a given cell number across cell towers. It also displays the changing locations of the cell number on a google map. + * + * This example demonstrates the scalability feature of the Apex platform. + * It showcases the ability of the platform to scale up and down as the phone numbers generated increase and decrease respectively. + * If the tuples processed per second by the pmove operator increase beyond 30,000, more partitions of the pmove operator gets deployed until + * each of the partition processes around 10000 to 30000 tuples per second. + * If the tuples processed per second drops below 10,000, the platform merges the operators until the partition count drops down to the original. + * The load can be varied using the tuplesBlast property. + * If the tuplesBlast is set to 200, 40K cell phones are generated. + * If the tuplesBlast is set to 1000, 200K cell phones are generated. + * The tuplesBlast property can be set using dtcli command: 'set-operator-property pmove tuplesBlast 1000'. + * + * + * The specs are as such<br> + * Depending on the tuplesBlast property, large number of cell phone numbers are generated. + * They jump a cell tower frequently. Sometimes + * within a second sometimes in 10 seconds. The aim is to demonstrate the + * following abilities<br> + * <ul> + * <li>Entering query dynamically: The phone numbers are added to locate its gps + * in run time.</li> + * <li>Changing functionality dynamically: The load is changed by making + * functional changes on the load generator operator (phonegen)(</li> + * <li>Auto Scale up/Down with load: Operator pmove increases and decreases + * partitions as per load</li> + * <li></li> + * </ul> + * + * Refer to examples/docs/MobileDemo.md for more information. + * + * <p> + * + * Running Java Test or Main app in IDE: + * + * <pre> + * LocalMode.runApp(new Application(), 600000); // 10 min run + * </pre> + * + * Run Success : <br> + * For successful deployment and run, user should see following output on + * console: <br> + * + * <pre> + * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3} + * phoneLocationQueryResult: {phone=5554995, location=(10,4), queryId=q1} + * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3} + * phoneLocationQueryResult: {phone=5554995, location=(10,4), queryId=q1} + * phoneLocationQueryResult: {phone=5554995, location=(10,5), queryId=q1} + * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3} + * phoneLocationQueryResult: {phone=5554995, location=(9,5), queryId=q1} + * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3} + * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3} + * phoneLocationQueryResult: {phone=5554995, location=(9,5), queryId=q1} + * phoneLocationQueryResult: {phone=5554995, location=(9,5), queryId=q1} + * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3} + * </pre> + * + * * <b>Application DAG : </b><br> + * <img src="doc-files/mobile.png" width=600px > <br> + * + * @since 0.3.2 + */ +@ApplicationAnnotation(name = "MobileExample") +public class Application implements StreamingApplication +{ + public static final String PHONE_RANGE_PROP = "dt.application.MobileExample.phoneRange"; + public static final String TOTAL_SEED_NOS = "dt.application.MobileExample.totalSeedNumbers"; + public static final String COOL_DOWN_MILLIS = "dt.application.MobileExample.coolDownMillis"; + public static final String MAX_THROUGHPUT = "dt.application.MobileExample.maxThroughput"; + public static final String MIN_THROUGHPUT = "dt.application.MobileExample.minThroughput"; + private static final Logger LOG = LoggerFactory.getLogger(Application.class); + private Range<Integer> phoneRange = Range.between(5550000, 5559999); + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + String lPhoneRange = conf.get(PHONE_RANGE_PROP, null); + if (lPhoneRange != null) { + String[] tokens = lPhoneRange.split("-"); + if (tokens.length != 2) { + throw new IllegalArgumentException("Invalid range: " + lPhoneRange); + } + this.phoneRange = Range.between(Integer.parseInt(tokens[0]), Integer.parseInt(tokens[1])); + } + LOG.debug("Phone range {}", this.phoneRange); + + RandomEventGenerator phones = dag.addOperator("Receiver", RandomEventGenerator.class); + phones.setMinvalue(this.phoneRange.getMinimum()); + phones.setMaxvalue(this.phoneRange.getMaximum()); + + PhoneMovementGenerator movementGen = dag.addOperator("LocationFinder", PhoneMovementGenerator.class); + dag.setAttribute(movementGen, OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>()); + + StatelessThroughputBasedPartitioner<PhoneMovementGenerator> partitioner = new StatelessThroughputBasedPartitioner<PhoneMovementGenerator>(); + partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 45000)); + partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000)); + partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000)); + dag.setAttribute(movementGen, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner})); + dag.setAttribute(movementGen, OperatorContext.PARTITIONER, partitioner); + + // generate seed numbers + Random random = new Random(); + int maxPhone = phoneRange.getMaximum() - phoneRange.getMinimum(); + int phonesToDisplay = conf.getInt(TOTAL_SEED_NOS, 10); + for (int i = phonesToDisplay; i-- > 0; ) { + int phoneNo = phoneRange.getMinimum() + random.nextInt(maxPhone + 1); + LOG.info("seed no: " + phoneNo); + movementGen.phoneRegister.add(phoneNo); + } + // done generating data + LOG.info("Finished generating seed data."); + + String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); + URI uri = URI.create("ws://" + gatewayAddress + "/pubsub"); + PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator("LocationResults", new PubSubWebSocketOutputOperator<Object>()); + wsOut.setUri(uri); + PubSubWebSocketInputOperator<Map<String, String>> wsIn = dag.addOperator("QueryLocation", new PubSubWebSocketInputOperator<Map<String, String>>()); + wsIn.setUri(uri); + // default partitioning: first connected stream to movementGen will be partitioned + dag.addStream("Phone-Data", phones.integer_data, movementGen.data); + dag.addStream("Results", movementGen.locationQueryResult, wsOut.input); + dag.addStream("Query", wsIn.outputPort, movementGen.phoneQuery); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneEntryOperator.java ---------------------------------------------------------------------- diff --git a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneEntryOperator.java b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneEntryOperator.java new file mode 100644 index 0000000..f8de357 --- /dev/null +++ b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneEntryOperator.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mobile; + +import java.util.Map; +import java.util.Random; +import javax.validation.constraints.Min; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; +import com.google.common.collect.Range; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * Generates mobile numbers that will be displayed in mobile example just after launch.<br></br> + * Operator attributes:<b> + * <ul> + * <li>initialDisplayCount: No. of seed phone numbers that will be generated.</li> + * <li>maxSeedPhoneNumber: The largest seed phone number.</li> + * </ul> + * </b> + * + * @since 0.3.5 + */ +public class PhoneEntryOperator extends BaseOperator +{ + private static Logger LOG = LoggerFactory.getLogger(PhoneEntryOperator.class); + + private boolean seedGenerationDone = false; + + @Min(0) + private int initialDisplayCount = 0; + + private int maxSeedPhoneNumber = 0; + private int rangeLowerEndpoint; + private int rangeUpperEndpoint; + + /** + * Sets the initial number of phones to display on the google map. + * + * @param i the count of initial phone numbers to display + */ + public void setInitialDisplayCount(int i) + { + initialDisplayCount = i; + } + + /** + * Sets the range for the phone numbers generated by the operator. + * + * @param i the range within which the phone numbers are randomly generated. + */ + public void setPhoneRange(Range<Integer> phoneRange) + { + this.rangeLowerEndpoint = phoneRange.lowerEndpoint(); + this.rangeUpperEndpoint = phoneRange.upperEndpoint(); + } + + /** + * Sets the max seed for random phone number generation + * + * @param i the number to initialize the random number phone generator. + */ + public void setMaxSeedPhoneNumber(int number) + { + this.maxSeedPhoneNumber = number; + } + + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<Map<String, String>> locationQuery = new DefaultInputPort<Map<String, String>>() + { + @Override + public void process(Map<String, String> tuple) + { + seedPhones.emit(tuple); + } + }; + + public final transient DefaultOutputPort<Map<String, String>> seedPhones = new DefaultOutputPort<Map<String, String>>(); + + @Override + public void beginWindow(long windowId) + { + if (!seedGenerationDone) { + Random random = new Random(); + int maxPhone = (maxSeedPhoneNumber <= rangeUpperEndpoint && maxSeedPhoneNumber >= rangeLowerEndpoint) ? maxSeedPhoneNumber : rangeUpperEndpoint; + maxPhone -= 5550000; + int phonesToDisplay = initialDisplayCount > maxPhone ? maxPhone : initialDisplayCount; + for (int i = phonesToDisplay; i-- > 0; ) { + int phoneNo = 5550000 + random.nextInt(maxPhone + 1); + LOG.info("seed no: " + phoneNo); + Map<String, String> valueMap = Maps.newHashMap(); + valueMap.put(PhoneMovementGenerator.KEY_COMMAND, PhoneMovementGenerator.COMMAND_ADD); + valueMap.put(PhoneMovementGenerator.KEY_PHONE, Integer.toString(phoneNo)); + seedPhones.emit(valueMap); + } + // done generating data + seedGenerationDone = true; + LOG.info("Finished generating seed data."); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneMovementGenerator.java ---------------------------------------------------------------------- diff --git a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneMovementGenerator.java b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneMovementGenerator.java new file mode 100644 index 0000000..3a293f0 --- /dev/null +++ b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/PhoneMovementGenerator.java @@ -0,0 +1,335 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mobile; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import javax.validation.constraints.Min; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang.mutable.MutableLong; + +import com.google.common.base.Strings; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.counters.BasicCounters; +import com.datatorrent.lib.util.HighLow; + +/** + * <p> + * This operator generates the GPS locations for the phone numbers specified. + * The range of phone numbers or a specific phone number can be set for which the GPS locations will be generated. + * It supports querying the locations of a given phone number. + * This is a partionable operator that can partition as the tuplesBlast increases. + * </p> + * + * @since 0.3.2 + */ +public class PhoneMovementGenerator extends BaseOperator +{ + public final transient DefaultInputPort<Integer> data = new DefaultInputPort<Integer>() + { + @Override + public void process(Integer tuple) + { + HighLow<Integer> loc = gps.get(tuple); + if (loc == null) { + loc = new HighLow<Integer>(random.nextInt(range), random.nextInt(range)); + gps.put(tuple, loc); + } + int xloc = loc.getHigh(); + int yloc = loc.getLow(); + int state = rotate % 4; + + // Compute new location + int delta = random.nextInt(100); + if (delta >= threshold) { + if (state < 2) { + xloc++; + } else { + xloc--; + } + if (xloc < 0) { + xloc += range; + } + } + delta = random.nextInt(100); + if (delta >= threshold) { + if ((state == 1) || (state == 3)) { + yloc++; + } else { + yloc--; + } + if (yloc < 0) { + yloc += range; + } + } + xloc %= range; + yloc %= range; + + // Set new location + HighLow<Integer> nloc = newgps.get(tuple); + if (nloc == null) { + newgps.put(tuple, new HighLow<Integer>(xloc, yloc)); + } else { + nloc.setHigh(xloc); + nloc.setLow(yloc); + } + rotate++; + } + }; + + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<Map<String,String>> phoneQuery = new DefaultInputPort<Map<String,String>>() + { + @Override + public void process(Map<String,String> tuple) + { + LOG.info("new query {}", tuple); + String command = tuple.get(KEY_COMMAND); + if (command != null) { + if (command.equals(COMMAND_ADD)) { + commandCounters.getCounter(CommandCounters.ADD).increment(); + String phoneStr = tuple.get(KEY_PHONE); + registerPhone(phoneStr); + } else if (command.equals(COMMAND_ADD_RANGE)) { + commandCounters.getCounter(CommandCounters.ADD_RANGE).increment(); + registerPhoneRange(tuple.get(KEY_START_PHONE), tuple.get(KEY_END_PHONE)); + } else if (command.equals(COMMAND_DELETE)) { + commandCounters.getCounter(CommandCounters.DELETE).increment(); + String phoneStr = tuple.get(KEY_PHONE); + deregisterPhone(phoneStr); + } else if (command.equals(COMMAND_CLEAR)) { + commandCounters.getCounter(CommandCounters.CLEAR).increment(); + clearPhones(); + } + } + } + }; + + public static final String KEY_COMMAND = "command"; + public static final String KEY_PHONE = "phone"; + public static final String KEY_LOCATION = "location"; + public static final String KEY_REMOVED = "removed"; + public static final String KEY_START_PHONE = "startPhone"; + public static final String KEY_END_PHONE = "endPhone"; + + public static final String COMMAND_ADD = "add"; + public static final String COMMAND_ADD_RANGE = "addRange"; + public static final String COMMAND_DELETE = "del"; + public static final String COMMAND_CLEAR = "clear"; + + final Set<Integer> phoneRegister = Sets.newHashSet(); + + private final transient HashMap<Integer, HighLow<Integer>> gps = new HashMap<Integer, HighLow<Integer>>(); + private final Random random = new Random(); + private int range = 50; + private int threshold = 80; + private int rotate = 0; + + protected BasicCounters<MutableLong> commandCounters; + + private transient OperatorContext context; + private final transient HashMap<Integer, HighLow<Integer>> newgps = new HashMap<Integer, HighLow<Integer>>(); + + public PhoneMovementGenerator() + { + this.commandCounters = new BasicCounters<MutableLong>(MutableLong.class); + } + + /** + * @return the range of the phone numbers + */ + @Min(0) + public int getRange() + { + return range; + } + + /** + * Sets the range of phone numbers for which the GPS locations need to be generated. + * + * @param i the range of phone numbers to set + */ + public void setRange(int i) + { + range = i; + } + + /** + * @return the threshold + */ + @Min(0) + public int getThreshold() + { + return threshold; + } + + /** + * Sets the threshold that decides how frequently the GPS locations are updated. + * + * @param i the value that decides how frequently the GPS locations change. + */ + public void setThreshold(int i) + { + threshold = i; + } + + private void registerPhone(String phoneStr) + { + // register the phone channel + if (Strings.isNullOrEmpty(phoneStr)) { + return; + } + try { + Integer phone = new Integer(phoneStr); + registerSinglePhone(phone); + } catch (NumberFormatException nfe) { + LOG.warn("Invalid no {}", phoneStr); + } + } + + private void registerPhoneRange(String startPhoneStr, String endPhoneStr) + { + if (Strings.isNullOrEmpty(startPhoneStr) || Strings.isNullOrEmpty(endPhoneStr)) { + LOG.warn("Invalid phone range {} {}", startPhoneStr, endPhoneStr); + return; + } + try { + Integer startPhone = new Integer(startPhoneStr); + Integer endPhone = new Integer(endPhoneStr); + if (endPhone < startPhone) { + LOG.warn("Invalid phone range {} {}", startPhone, endPhone); + return; + } + for (int i = startPhone; i <= endPhone; i++) { + registerSinglePhone(i); + } + } catch (NumberFormatException nfe) { + LOG.warn("Invalid phone range <{},{}>", startPhoneStr, endPhoneStr); + } + } + + private void registerSinglePhone(int phone) + { + phoneRegister.add(phone); + LOG.debug("Registered query id with phone {}", phone); + emitQueryResult(phone); + } + + private void deregisterPhone(String phoneStr) + { + if (Strings.isNullOrEmpty(phoneStr)) { + return; + } + try { + Integer phone = new Integer(phoneStr); + // remove the channel + if (phoneRegister.contains(phone)) { + phoneRegister.remove(phone); + LOG.debug("Removing query id {}", phone); + emitPhoneRemoved(phone); + } + } catch (NumberFormatException nfe) { + LOG.warn("Invalid phone {}", phoneStr); + } + } + + private void clearPhones() + { + phoneRegister.clear(); + LOG.info("Clearing phones"); + } + + public final transient DefaultOutputPort<Map<String, String>> locationQueryResult = new DefaultOutputPort<Map<String, String>>(); + + @Override + public void setup(OperatorContext context) + { + this.context = context; + commandCounters.setCounter(CommandCounters.ADD, new MutableLong()); + commandCounters.setCounter(CommandCounters.ADD_RANGE, new MutableLong()); + commandCounters.setCounter(CommandCounters.DELETE, new MutableLong()); + commandCounters.setCounter(CommandCounters.CLEAR, new MutableLong()); + } + + /** + * Emit all the data and clear the hash + */ + @Override + public void endWindow() + { + for (Map.Entry<Integer, HighLow<Integer>> e: newgps.entrySet()) { + HighLow<Integer> loc = gps.get(e.getKey()); + if (loc == null) { + gps.put(e.getKey(), e.getValue()); + } else { + loc.setHigh(e.getValue().getHigh()); + loc.setLow(e.getValue().getLow()); + } + } + boolean found = false; + for (Integer phone: phoneRegister) { + emitQueryResult( phone); + found = true; + } + if (!found) { + LOG.debug("No phone number"); + } + newgps.clear(); + context.setCounters(commandCounters); + } + + private void emitQueryResult(Integer phone) + { + HighLow<Integer> loc = gps.get(phone); + if (loc != null) { + Map<String, String> queryResult = new HashMap<String, String>(); + queryResult.put(KEY_PHONE, String.valueOf(phone)); + queryResult.put(KEY_LOCATION, loc.toString()); + locationQueryResult.emit(queryResult); + } + } + + private void emitPhoneRemoved(Integer phone) + { + Map<String,String> removedResult = Maps.newHashMap(); + removedResult.put(KEY_PHONE, String.valueOf(phone)); + removedResult.put(KEY_REMOVED,"true"); + locationQueryResult.emit(removedResult); + } + + public static enum CommandCounters + { + ADD, ADD_RANGE, DELETE, CLEAR + } + + private static final Logger LOG = LoggerFactory.getLogger(PhoneMovementGenerator.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/main/java/org/apache/apex/examples/mobile/doc-files/Mobile.png ---------------------------------------------------------------------- diff --git a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/doc-files/Mobile.png b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/doc-files/Mobile.png new file mode 100644 index 0000000..a25da0d Binary files /dev/null and b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/doc-files/Mobile.png differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/main/java/org/apache/apex/examples/mobile/package-info.java ---------------------------------------------------------------------- diff --git a/examples/mobile/src/main/java/org/apache/apex/examples/mobile/package-info.java b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/package-info.java new file mode 100644 index 0000000..1262504 --- /dev/null +++ b/examples/mobile/src/main/java/org/apache/apex/examples/mobile/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Mobile phones tracking demonstration application. + */ +package org.apache.apex.examples.mobile; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/mobile/src/main/resources/META-INF/properties.xml b/examples/mobile/src/main/resources/META-INF/properties.xml new file mode 100644 index 0000000..e213d18 --- /dev/null +++ b/examples/mobile/src/main/resources/META-INF/properties.xml @@ -0,0 +1,82 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <property> + <name>dt.application.MobileExample.coolDownMillis</name> + <value>45000</value> + </property> + <property> + <name>dt.application.MobileExample.maxThroughput</name> + <value>30000</value> + </property> + <property> + <name>dt.application.MobileExample.minThroughput</name> + <value>10000</value> + </property> + <property> + <name>dt.application.MobileExample.operator.Receiver.tuplesBlast</name> + <value>200</value> + </property> + <property> + <name>dt.application.MobileExample.operator.Receiver.tuplesBlastIntervalMillis</name> + <value>5</value> + </property> + <property> + <name>dt.application.MobileExample.operator.Receiver.outputport.integer_data.attr.QUEUE_CAPACITY</name> + <value>32768</value> + </property> + <property> + <name>dt.application.MobileExample.operator.LocationFinder.range</name> + <value>20</value> + </property> + <property> + <name>dt.application.MobileExample.operator.LocationFinder.threshold</name> + <value>80</value> + </property> + <property> + <name>dt.application.MobileExample.operator.LocationFinder.inputport.data.attr.QUEUE_CAPACITY</name> + <value>32768</value> + </property> + <property> + <name>dt.application.MobileExample.operator.LocationResults.prop.topic</name> + <value>examples.mobile.phoneLocationQueryResult</value> + </property> + <property> + <name>dt.application.MobileExample.operator.QueryLocation.prop.topic</name> + <value>examples.mobile.phoneLocationQuery</value> + </property> + <property> + <name>dt.application.MobileExample.operator.*.attr.MEMORY_MB</name> + <value>768</value> + </property> + <property> + <name>dt.application.MobileExample.operator.*.attr.JVM_OPTIONS</name> + <value>-Xmx128m</value> + </property> + <property> + <name>dt.application.MobileExample.operator.*.port.*.attr.BUFFER_MEMORY_MB</name> + <value>256</value> + </property> + <property> + <name>dt.application.MobileExample.attr.MASTER_MEMORY_MB</name> + <value>1024</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java b/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java new file mode 100644 index 0000000..ce6ca41 --- /dev/null +++ b/examples/mobile/src/test/java/org/apache/apex/examples/mobile/ApplicationTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mobile; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +import javax.servlet.Servlet; + +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet; +import com.datatorrent.lib.io.PubSubWebSocketInputOperator; +import com.datatorrent.lib.io.PubSubWebSocketOutputOperator; +import com.datatorrent.lib.testbench.CollectorTestSink; + +public class ApplicationTest +{ + private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class); + + public ApplicationTest() + { + } + + /** + * Test of getApplication method, of class Application. + */ + @Test + public void testGetApplication() throws Exception + { + Configuration conf = new Configuration(false); + conf.addResource("dt-site-mobile.xml"); + Server server = new Server(0); + Servlet servlet = new SamplePubSubWebSocketServlet(); + ServletHolder sh = new ServletHolder(servlet); + ServletContextHandler contextHandler = new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS); + contextHandler.addServlet(sh, "/pubsub"); + contextHandler.addServlet(sh, "/*"); + server.start(); + Connector[] connector = server.getConnectors(); + conf.set("dt.attr.GATEWAY_CONNECT_ADDRESS", "localhost:" + connector[0].getLocalPort()); + URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub"); + + PubSubWebSocketOutputOperator<Object> outputOperator = new PubSubWebSocketOutputOperator<Object>(); + outputOperator.setUri(uri); + outputOperator.setTopic(conf.get("dt.application.MobileExample.operator.QueryLocation.topic")); + + PubSubWebSocketInputOperator<Map<String, String>> inputOperator = new PubSubWebSocketInputOperator<Map<String, String>>(); + inputOperator.setUri(uri); + inputOperator.setTopic(conf.get("dt.application.MobileExample.operator.LocationResults.topic")); + + CollectorTestSink<Object> sink = new CollectorTestSink<Object>(); + inputOperator.outputPort.setSink(sink); + + Map<String, String> data = new HashMap<String, String>(); + data.put("command", "add"); + data.put("phone", "5559990"); + + Application app = new Application(); + LocalMode lma = LocalMode.newInstance(); + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(false); + lc.runAsync(); + Thread.sleep(5000); + inputOperator.setup(null); + outputOperator.setup(null); + inputOperator.activate(null); + outputOperator.beginWindow(0); + outputOperator.input.process(data); + outputOperator.endWindow(); + inputOperator.beginWindow(0); + int timeoutMillis = 5000; + while (sink.collectedTuples.size() < 5 && timeoutMillis > 0) { + inputOperator.emitTuples(); + timeoutMillis -= 20; + Thread.sleep(20); + } + inputOperator.endWindow(); + lc.shutdown(); + inputOperator.teardown(); + outputOperator.teardown(); + server.stop(); + Assert.assertTrue("size of output is 5 ", sink.collectedTuples.size() == 5); + for (Object obj : sink.collectedTuples) { + Assert.assertEquals("Expected phone number", "5559990", ((Map<String, String>)obj).get("phone")); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/test/resources/dt-site-mobile.xml ---------------------------------------------------------------------- diff --git a/examples/mobile/src/test/resources/dt-site-mobile.xml b/examples/mobile/src/test/resources/dt-site-mobile.xml new file mode 100644 index 0000000..dcb6c98 --- /dev/null +++ b/examples/mobile/src/test/resources/dt-site-mobile.xml @@ -0,0 +1,87 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <property> + <name>dt.application.MobileExample.class</name> + <value>org.apache.apex.examples.mobile.Application</value> + <description>An alias for the application</description> + </property> + <!--property> + <name>dt.attr.GATEWAY_CONNECT_ADDRESS</name> + <value>localhost:19090</value> + </property--> + <property> + <name>dt.application.MobileExample.totalSeedNumbers</name> + <value>0</value> + </property> + <property> + <name>dt.application.MobileExample.coolDownMillis</name> + <value>45000</value> + </property> + <property> + <name>dt.application.MobileExample.maxThroughput</name> + <value>30000</value> + </property> + <property> + <name>dt.application.MobileExample.minThroughput</name> + <value>1</value> + </property> + <property> + <name>dt.application.MobileExample.operator.Receiver.tuplesBlast</name> + <value>200</value> + </property> + <property> + <name>dt.application.MobileExample.operator.Receiver.tuplesBlastIntervalMillis</name> + <value>5</value> + </property> + <property> + <name>dt.application.MobileExample.operator.Receiver.outputport.integer_data.attr.QUEUE_CAPACITY</name> + <value>32768</value> + </property> + <property> + <name>dt.application.MobileExample.operator.LocationFinder.range</name> + <value>20</value> + </property> + <property> + <name>dt.application.MobileExample.operator.LocationFinder.threshold</name> + <value>80</value> + </property> + <property> + <name>dt.application.MobileExample.operator.LocationFinder.inputport.data.attr.QUEUE_CAPACITY</name> + <value>32768</value> + </property> + <property> + <name>dt.application.MobileExample.operator.LocationResults.topic</name> + <value>resultTopic</value> + </property> + <property> + <name>dt.application.MobileExample.operator.QueryLocation.topic</name> + <value>queryTopic</value> + </property> + <property> + <name>dt.application.MobileExample.operator.*.attr.MEMORY_MB</name> + <value>2048</value> + </property> + <property> + <name>dt.application.MobileExample.attr.MASTER_MEMORY_MB</name> + <value>1024</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mobile/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/mobile/src/test/resources/log4j.properties b/examples/mobile/src/test/resources/log4j.properties new file mode 100644 index 0000000..cf0d19e --- /dev/null +++ b/examples/mobile/src/test/resources/log4j.properties @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=${test.log.console.threshold} +test.log.console.threshold=DEBUG + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug +log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/pom.xml ---------------------------------------------------------------------- diff --git a/examples/mrmonitor/pom.xml b/examples/mrmonitor/pom.xml new file mode 100644 index 0000000..527565b --- /dev/null +++ b/examples/mrmonitor/pom.xml @@ -0,0 +1,64 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>malhar-examples-mrmonitor</artifactId> + <packaging>jar</packaging> + + <name>Apache Apex Malhar MR Monitoring Example</name> + <description></description> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <properties> + <skipTests>true</skipTests> + </properties> + + <dependencies> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-websocket</artifactId> + <version>8.1.10.v20130312</version> + <scope>test</scope> + <type>jar</type> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + <version>8.1.10.v20130312</version> + <scope>test</scope> + <type>jar</type> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.3.5</version> + <type>jar</type> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/mrmonitor/src/assemble/appPackage.xml b/examples/mrmonitor/src/assemble/appPackage.xml new file mode 100644 index 0000000..4138cf2 --- /dev/null +++ b/examples/mrmonitor/src/assemble/appPackage.xml @@ -0,0 +1,59 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Application.java ---------------------------------------------------------------------- diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Application.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Application.java new file mode 100644 index 0000000..0557919 --- /dev/null +++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Application.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mrmonitor; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.testbench.SeedEventGenerator; + +/** + * Application + * + * @since 2.0.0 + */ +@ApplicationAnnotation(name = "MyFirstApplication") +public class Application implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // Sample DAG with 2 operators + // Replace this code with the DAG you want to build + + SeedEventGenerator seedGen = dag.addOperator("seedGen", SeedEventGenerator.class); + seedGen.setSeedStart(1); + seedGen.setSeedEnd(10); + seedGen.addKeyData("x", 0, 10); + seedGen.addKeyData("y", 0, 100); + + ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator()); + cons.setStringFormat("hello: %s"); + + dag.addStream("seeddata", seedGen.val_list, cons.input).setLocality(Locality.CONTAINER_LOCAL); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Constants.java ---------------------------------------------------------------------- diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Constants.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Constants.java new file mode 100644 index 0000000..1ed1f26 --- /dev/null +++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/Constants.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mrmonitor; + +/** + * <p>Constants class.</p> + * + * @since 0.3.4 + */ +public interface Constants +{ + + public static final int MAX_NUMBER_OF_JOBS = 25; + + public static final String REDUCE_TASK_TYPE = "REDUCE"; + public static final String MAP_TASK_TYPE = "MAP"; + public static final String TASK_TYPE = "type"; + public static final String TASK_ID = "id"; + + public static final String LEAGACY_TASK_ID = "taskId"; + public static final int MAX_TASKS = 2000; + + public static final String QUERY_APP_ID = "app_id"; + public static final String QUERY_JOB_ID = "job_id"; + public static final String QUERY_HADOOP_VERSION = "hadoop_version"; + public static final String QUERY_API_VERSION = "api_version"; + public static final String QUERY_RM_PORT = "rm_port"; + public static final String QUERY_HS_PORT = "hs_port"; + public static final String QUERY_HOST_NAME = "hostname"; + public static final String QUERY_KEY_COMMAND = "command"; + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRJobStatusOperator.java ---------------------------------------------------------------------- diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRJobStatusOperator.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRJobStatusOperator.java new file mode 100644 index 0000000..24b3887 --- /dev/null +++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRJobStatusOperator.java @@ -0,0 +1,622 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mrmonitor; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.apex.examples.mrmonitor.MRStatusObject.TaskObject; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.IdleTimeHandler; + +/** + * <p> + * MRJobStatusOperator class. + * </p> + * + * @since 0.3.4 + */ +public class MRJobStatusOperator implements Operator, IdleTimeHandler +{ + private static final Logger LOG = LoggerFactory.getLogger(MRJobStatusOperator.class); + + private static final String JOB_PREFIX = "job_"; + /** + * This outputs the meta information of the job + */ + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>(); + /** + * This outputs the map task information of the job + */ + public final transient DefaultOutputPort<String> mapOutput = new DefaultOutputPort<String>(); + /** + * This outputs the reduce task information of the job + */ + public final transient DefaultOutputPort<String> reduceOutput = new DefaultOutputPort<String>(); + /** + * This outputs the counter information of the job + */ + public final transient DefaultOutputPort<String> counterOutput = new DefaultOutputPort<String>(); + /** + * This is time in Ms before making new request for data + */ + private transient int sleepTime = 100; + /** + * This is the number of consecutive windows of no change before the job is removed from map + */ + private int maxRetrials = 10; + /** + * The number of minutes for which the status history of map and reduce tasks is stored + */ + private int statusHistoryTime = 60; + private Map<String, MRStatusObject> jobMap = new HashMap<String, MRStatusObject>(); + /** + * This represents the maximum number of jobs the single instance of this operator is going to server at any time + */ + private int maxJobs = Constants.MAX_NUMBER_OF_JOBS; + private transient Iterator<MRStatusObject> iterator; + + /* + * each input string is a map of the following format {"app_id":<>,"hadoop_version":<>,"api_version":<>,"command":<>, + * "hostname":<>,"hs_port":<>,"rm_port":<>,"job_id":<>} + */ + public final transient DefaultInputPort<MRStatusObject> input = new DefaultInputPort<MRStatusObject>() + { + @Override + public void process(MRStatusObject mrStatusObj) + { + + if (jobMap == null) { + jobMap = new HashMap<String, MRStatusObject>(); + } + + if (jobMap.size() >= maxJobs) { + return; + } + + if ("delete".equalsIgnoreCase(mrStatusObj.getCommand())) { + removeJob(mrStatusObj.getJobId()); + JSONObject outputJsonObject = new JSONObject(); + try { + outputJsonObject.put("id", mrStatusObj.getJobId()); + outputJsonObject.put("removed", "true"); + output.emit(outputJsonObject.toString()); + } catch (JSONException e) { + LOG.warn("Error creating JSON: {}", e.getMessage()); + } + return; + } + if ("clear".equalsIgnoreCase(mrStatusObj.getCommand())) { + clearMap(); + return; + } + + if (jobMap.get(mrStatusObj.getJobId()) != null) { + mrStatusObj = jobMap.get(mrStatusObj.getJobId()); + } + if (mrStatusObj.getHadoopVersion() == 2) { + getJsonForJob(mrStatusObj); + } else if (mrStatusObj.getHadoopVersion() == 1) { + getJsonForLegacyJob(mrStatusObj); + } + mrStatusObj.setStatusHistoryCount(statusHistoryTime); + iterator = jobMap.values().iterator(); + emitHelper(mrStatusObj); + } + }; + + public int getStatusHistoryTime() + { + return statusHistoryTime; + } + + public void setStatusHistoryTime(int statusHistoryTime) + { + this.statusHistoryTime = statusHistoryTime; + if (jobMap != null && jobMap.size() > 0) { + for (Entry<String, MRStatusObject> entry : jobMap.entrySet()) { + entry.getValue().setStatusHistoryCount(statusHistoryTime); + } + } + + } + + /** + * This method gets the latest status of the job from the Resource Manager for jobs submitted on hadoop 2.x version + * + * @param statusObj + */ + private void getJsonForJob(MRStatusObject statusObj) + { + + String url = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/proxy/application_" + statusObj.getAppId() + "/ws/v1/mapreduce/jobs/job_" + statusObj.getJobId(); + String responseBody = MRUtil.getJsonForURL(url); + + JSONObject jsonObj = MRUtil.getJsonObject(responseBody); + + if (jsonObj == null) { + url = "http://" + statusObj.getUri() + ":" + statusObj.getHistoryServerPort() + "/ws/v1/history/mapreduce/jobs/job_" + statusObj.getJobId(); + responseBody = MRUtil.getJsonForURL(url); + jsonObj = MRUtil.getJsonObject(responseBody); + } + + if (jsonObj != null) { + if (jobMap.get(statusObj.getJobId()) != null) { + MRStatusObject tempObj = jobMap.get(statusObj.getJobId()); + if (tempObj.getJsonObject().toString().equals(jsonObj.toString())) { + getJsonsForTasks(statusObj); + getCounterInfoForJob(statusObj); + return; + } + } + statusObj.setModified(true); + statusObj.setJsonObject(jsonObj); + getCounterInfoForJob(statusObj); + getJsonsForTasks(statusObj); + jobMap.put(statusObj.getJobId(), statusObj); + } + } + + /** + * This method is used to collect the metric information about the job + * + * @param statusObj + */ + private void getCounterInfoForJob(MRStatusObject statusObj) + { + String url = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/proxy/application_" + statusObj.getAppId() + "/ws/v1/mapreduce/jobs/job_" + statusObj.getJobId() + "/counters"; + String responseBody = MRUtil.getJsonForURL(url); + JSONObject jsonObj = MRUtil.getJsonObject(responseBody); + if (jsonObj == null) { + url = "http://" + statusObj.getUri() + ":" + statusObj.getHistoryServerPort() + "/ws/v1/history/mapreduce/jobs/job_" + statusObj.getJobId() + "/counters"; + responseBody = MRUtil.getJsonForURL(url); + jsonObj = MRUtil.getJsonObject(responseBody); + } + + if (jsonObj != null) { + if (statusObj.getMetricObject() == null) { + statusObj.setMetricObject(new TaskObject(jsonObj)); + } else if (!statusObj.getMetricObject().getJsonString().equalsIgnoreCase(jsonObj.toString())) { + statusObj.getMetricObject().setJson(jsonObj); + statusObj.getMetricObject().setModified(true); + } + } + } + + /** + * This method gets the latest status of the tasks for a job from the Resource Manager for jobs submitted on hadoop + * 2.x version + * + * @param statusObj + */ + private void getJsonsForTasks(MRStatusObject statusObj) + { + String url = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/proxy/application_" + statusObj.getAppId() + "/ws/v1/mapreduce/jobs/job_" + statusObj.getJobId() + "/tasks/"; + String responseBody = MRUtil.getJsonForURL(url); + + JSONObject jsonObj = MRUtil.getJsonObject(responseBody); + if (jsonObj == null) { + url = "http://" + statusObj.getUri() + ":" + statusObj.getHistoryServerPort() + "/ws/v1/history/mapreduce/jobs/job_" + statusObj.getJobId() + "/tasks/"; + responseBody = MRUtil.getJsonForURL(url); + + jsonObj = MRUtil.getJsonObject(responseBody); + } + + if (jsonObj != null) { + + try { + Map<String, TaskObject> mapTaskOject = statusObj.getMapJsonObject(); + Map<String, TaskObject> reduceTaskOject = statusObj.getReduceJsonObject(); + JSONArray taskJsonArray = jsonObj.getJSONObject("tasks").getJSONArray("task"); + + for (int i = 0; i < taskJsonArray.length(); i++) { + JSONObject taskObj = taskJsonArray.getJSONObject(i); + if (Constants.REDUCE_TASK_TYPE.equalsIgnoreCase(taskObj.getString(Constants.TASK_TYPE))) { + if (reduceTaskOject.get(taskObj.getString(Constants.TASK_ID)) != null) { + TaskObject tempTaskObj = reduceTaskOject.get(taskObj.getString(Constants.TASK_ID)); + if (tempTaskObj.getJsonString().equals(taskObj.toString())) { + continue; + } + tempTaskObj.setJson(taskObj); + tempTaskObj.setModified(true); + reduceTaskOject.put(taskObj.getString(Constants.TASK_ID), tempTaskObj); + continue; + } + reduceTaskOject.put(taskObj.getString(Constants.TASK_ID), new TaskObject(taskObj)); + } else { + if (mapTaskOject.get(taskObj.getString(Constants.TASK_ID)) != null) { + TaskObject tempTaskObj = mapTaskOject.get(taskObj.getString(Constants.TASK_ID)); + if (tempTaskObj.getJsonString().equals(taskObj.toString())) { + continue; + } + tempTaskObj.setJson(taskObj); + tempTaskObj.setModified(true); + mapTaskOject.put(taskObj.getString(Constants.TASK_ID), tempTaskObj); + continue; + } + mapTaskOject.put(taskObj.getString(Constants.TASK_ID), new TaskObject(taskObj)); + } + } + statusObj.setMapJsonObject(mapTaskOject); + statusObj.setReduceJsonObject(reduceTaskOject); + } catch (Exception e) { + LOG.info("exception: {}", e.getMessage()); + } + } + + } + + /** + * This method gets the latest status of the job from the Task Manager for jobs submitted on hadoop 1.x version + * + * @param statusObj + */ + private void getJsonForLegacyJob(MRStatusObject statusObj) + { + + String url = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/jobdetails.jsp?format=json&jobid=job_" + statusObj.getJobId(); + String responseBody = MRUtil.getJsonForURL(url); + + JSONObject jsonObj = MRUtil.getJsonObject(responseBody); + if (jsonObj == null) { + return; + } + + if (jobMap.get(statusObj.getJobId()) != null) { + MRStatusObject tempObj = jobMap.get(statusObj.getJobId()); + if (tempObj.getJsonObject().toString().equals(jsonObj.toString())) { + getJsonsForLegacyTasks(statusObj, "map"); + getJsonsForLegacyTasks(statusObj, "reduce"); + // output.emit(jsonObj.toString()); + // removeJob(statusObj.getJobId()); + return; + } + } + + // output.emit(jsonObj.toString()); + statusObj.setModified(true); + statusObj.setJsonObject(jsonObj); + getJsonsForLegacyTasks(statusObj, "map"); + getJsonsForLegacyTasks(statusObj, "reduce"); + jobMap.put(statusObj.getJobId(), statusObj); + + } + + /** + * This method gets the latest status of the tasks for a job from the Task Manager for jobs submitted on hadoop 1.x + * version + * + * @param statusObj + * @param type + */ + private void getJsonsForLegacyTasks(MRStatusObject statusObj, String type) + { + try { + JSONObject jobJson = statusObj.getJsonObject(); + int totalTasks = ((JSONObject)((JSONObject)jobJson.get(type + "TaskSummary")).get("taskStats")).getInt("numTotalTasks"); + Map<String, TaskObject> taskMap; + if (type.equalsIgnoreCase("map")) { + taskMap = statusObj.getMapJsonObject(); + } else { + taskMap = statusObj.getReduceJsonObject(); + } + + int totalPagenums = (totalTasks / Constants.MAX_TASKS) + 1; + String baseUrl = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/jobtasks.jsp?type=" + type + "&format=json&jobid=job_" + statusObj.getJobId() + "&pagenum="; + + for (int pagenum = 1; pagenum <= totalPagenums; pagenum++) { + + String url = baseUrl + pagenum; + String responseBody = MRUtil.getJsonForURL(url); + + JSONObject jsonObj = MRUtil.getJsonObject(responseBody); + if (jsonObj == null) { + return; + } + + JSONArray taskJsonArray = jsonObj.getJSONArray("tasksInfo"); + + for (int i = 0; i < taskJsonArray.length(); i++) { + JSONObject taskObj = taskJsonArray.getJSONObject(i); + { + if (taskMap.get(taskObj.getString(Constants.LEAGACY_TASK_ID)) != null) { + TaskObject tempReduceObj = taskMap.get(taskObj.getString(Constants.LEAGACY_TASK_ID)); + if (tempReduceObj.getJsonString().equals(taskObj.toString())) { + // tempReduceObj.setModified(false); + // taskMap.put(taskObj.getString(Constants.TASK_ID), tempReduceObj); + continue; + } + tempReduceObj.setJson(taskObj); + tempReduceObj.setModified(true); + taskMap.put(taskObj.getString(Constants.TASK_ID), tempReduceObj); + continue; + + } + taskMap.put(taskObj.getString(Constants.LEAGACY_TASK_ID), new TaskObject(taskObj)); + } + } + } + + if (type.equalsIgnoreCase("map")) { + statusObj.setMapJsonObject(taskMap); + } else { + statusObj.setReduceJsonObject(taskMap); + } + } catch (Exception e) { + LOG.info(e.getMessage()); + } + + } + + @Override + public void handleIdleTime() + { + try { + Thread.sleep(sleepTime);// + } catch (InterruptedException ie) { + // If this thread was intrrupted by nother thread + } + if (!iterator.hasNext()) { + iterator = jobMap.values().iterator(); + } + + if (iterator.hasNext()) { + MRStatusObject obj = iterator.next(); + if (obj.getHadoopVersion() == 2) { + getJsonForJob(obj); + } else if (obj.getHadoopVersion() == 1) { + getJsonForLegacyJob(obj); + } + } + } + + @Override + public void setup(OperatorContext context) + { + iterator = jobMap.values().iterator(); + sleepTime = context.getValue(OperatorContext.SPIN_MILLIS); + } + + @Override + public void teardown() + { + } + + @Override + public void beginWindow(long arg0) + { + } + + private void emitHelper(MRStatusObject obj) + { + try { + obj.setModified(false); + output.emit(obj.getJsonObject().toString()); + JSONObject outputJsonObject = new JSONObject(); + + outputJsonObject.put("id", JOB_PREFIX + obj.getJobId()); + outputJsonObject.put("mapHistory", new JSONArray(obj.getMapStatusHistory())); + outputJsonObject.put("reduceHistory", new JSONArray(obj.getReduceStatusHistory())); + outputJsonObject.put("physicalMemoryHistory", new JSONArray(obj.getPhysicalMemeoryStatusHistory())); + outputJsonObject.put("virtualMemoryHistory", new JSONArray(obj.getVirtualMemoryStatusHistory())); + outputJsonObject.put("cpuHistory", new JSONArray(obj.getCpuStatusHistory())); + output.emit(outputJsonObject.toString()); + obj.setChangedHistoryStatus(false); + + outputJsonObject = new JSONObject(); + outputJsonObject.put("id", JOB_PREFIX + obj.getJobId()); + JSONArray arr = new JSONArray(); + + for (Map.Entry<String, TaskObject> mapEntry : obj.getMapJsonObject().entrySet()) { + TaskObject json = mapEntry.getValue(); + json.setModified(false); + arr.put(json.getJson()); + } + + outputJsonObject.put("tasks", arr); + mapOutput.emit(outputJsonObject.toString()); + + outputJsonObject = new JSONObject(); + outputJsonObject.put("id", JOB_PREFIX + obj.getJobId()); + arr = new JSONArray(); + + for (Map.Entry<String, TaskObject> mapEntry : obj.getReduceJsonObject().entrySet()) { + TaskObject json = mapEntry.getValue(); + json.setModified(false); + arr.put(json.getJson()); + } + + outputJsonObject.put("tasks", arr); + reduceOutput.emit(outputJsonObject.toString()); + obj.setRetrials(0); + } catch (Exception e) { + LOG.warn("error creating json {}", e.getMessage()); + } + + } + + @Override + public void endWindow() + { + List<String> delList = new ArrayList<String>(); + try { + for (Map.Entry<String, MRStatusObject> entry : jobMap.entrySet()) { + MRStatusObject obj = entry.getValue(); + + JSONObject outputJsonObject = new JSONObject(); + outputJsonObject.put("id", JOB_PREFIX + obj.getJobId()); + + boolean modified = false; + + if (obj.isModified()) { + modified = true; + obj.setModified(false); + output.emit(obj.getJsonObject().toString()); + if (obj.isChangedHistoryStatus()) { + outputJsonObject.put("mapHistory", new JSONArray(obj.getMapStatusHistory())); + outputJsonObject.put("reduceHistory", new JSONArray(obj.getReduceStatusHistory())); + outputJsonObject.put("physicalMemoryHistory", new JSONArray(obj.getPhysicalMemeoryStatusHistory())); + outputJsonObject.put("virtualMemoryHistory", new JSONArray(obj.getVirtualMemoryStatusHistory())); + outputJsonObject.put("cpuHistory", new JSONArray(obj.getCpuStatusHistory())); + output.emit(outputJsonObject.toString()); + obj.setChangedHistoryStatus(false); + } + } + outputJsonObject = new JSONObject(); + outputJsonObject.put("id", JOB_PREFIX + obj.getJobId()); + JSONArray arr = new JSONArray(); + + for (Map.Entry<String, TaskObject> mapEntry : obj.getMapJsonObject().entrySet()) { + TaskObject json = mapEntry.getValue(); + if (json.isModified()) { + modified = true; + json.setModified(false); + arr.put(json.getJson()); + } + } + + if (arr.length() > 0) { + outputJsonObject.put("tasks", arr); + mapOutput.emit(outputJsonObject.toString()); + } + + outputJsonObject = new JSONObject(); + outputJsonObject.put("id", JOB_PREFIX + obj.getJobId()); + arr = new JSONArray(); + + for (Map.Entry<String, TaskObject> mapEntry : obj.getReduceJsonObject().entrySet()) { + TaskObject json = mapEntry.getValue(); + if (json.isModified()) { + modified = true; + json.setModified(false); + arr.put(json.getJson()); + } + } + if (arr.length() > 0) { + outputJsonObject.put("tasks", arr); + reduceOutput.emit(outputJsonObject.toString()); + } + + if (obj.getMetricObject() != null && obj.getMetricObject().isModified()) { + modified = true; + obj.getMetricObject().setModified(false); + counterOutput.emit(obj.getMetricObject().getJsonString()); + } + + if (!modified) { + if (obj.getRetrials() >= maxRetrials) { + delList.add(obj.getJobId()); + } else { + obj.setRetrials(obj.getRetrials() + 1); + } + } else { + obj.setRetrials(0); + } + } + } catch (Exception ex) { + LOG.warn("error creating json {}", ex.getMessage()); + } + + if (!delList.isEmpty()) { + Iterator<String> itr = delList.iterator(); + while (itr.hasNext()) { + removeJob(itr.next()); + } + } + + } + + /** + * This method removes the job from the map + * + * @param jobId + */ + public void removeJob(String jobId) + { + if (jobMap != null) { + jobMap.remove(jobId); + iterator = jobMap.values().iterator(); + } + } + + /** + * This method clears the job map + */ + public void clearMap() + { + if (jobMap != null) { + jobMap.clear(); + iterator = jobMap.values().iterator(); + } + } + + /** + * This returns the maximum number of jobs the single instance of this operator is going to server at any time + * + * @return + */ + public int getMaxJobs() + { + return maxJobs; + } + + /** + * This sets the maximum number of jobs the single instance of this operator is going to server at any time + * + * @param maxJobs + */ + public void setMaxJobs(int maxJobs) + { + this.maxJobs = maxJobs; + } + + /** + * This sets the number of consecutive windows of no change before the job is removed from map + * + * @return + */ + public int getMaxRetrials() + { + return maxRetrials; + } + + /** + * This returns the number of consecutive windows of no change before the job is removed from map + * + * @param maxRetrials + */ + public void setMaxRetrials(int maxRetrials) + { + this.maxRetrials = maxRetrials; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java ---------------------------------------------------------------------- diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java new file mode 100644 index 0000000..288da84 --- /dev/null +++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRMonitoringApplication.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mrmonitor; + +import java.net.URI; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +import com.datatorrent.lib.io.PubSubWebSocketInputOperator; +import com.datatorrent.lib.io.PubSubWebSocketOutputOperator; + +/** + * <p> + * MRDebuggerApplication class. + * </p> + * + * @since 0.3.4 + */ +@ApplicationAnnotation(name = "MRMonitoringExample") +public class MRMonitoringApplication implements StreamingApplication +{ + + private static final Logger logger = LoggerFactory.getLogger(MRMonitoringApplication.class); + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + String daemonAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); + MRJobStatusOperator mrJobOperator = dag.addOperator("JobMonitor", new MRJobStatusOperator()); + URI uri = URI.create("ws://" + daemonAddress + "/pubsub"); + logger.info("WebSocket with daemon at {}", daemonAddress); + + PubSubWebSocketInputOperator wsIn = dag.addOperator("Query", new PubSubWebSocketInputOperator()); + wsIn.setUri(uri); + + MapToMRObjectOperator queryConverter = dag.addOperator("QueryConverter", new MapToMRObjectOperator()); + + /** + * This is used to emit the meta data about the job + */ + PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator("JobOutput", new PubSubWebSocketOutputOperator<Object>()); + wsOut.setUri(uri); + + /** + * This is used to emit the information of map tasks of the job + */ + PubSubWebSocketOutputOperator<Object> wsMapOut = dag.addOperator("MapJob", new PubSubWebSocketOutputOperator<Object>()); + wsMapOut.setUri(uri); + + /** + * This is used to emit the information of reduce tasks of the job + */ + PubSubWebSocketOutputOperator<Object> wsReduceOut = dag.addOperator("ReduceJob", new PubSubWebSocketOutputOperator<Object>()); + wsReduceOut.setUri(uri); + + /** + * This is used to emit the metric information of the job + */ + PubSubWebSocketOutputOperator<Object> wsCounterOut = dag.addOperator("JobCounter", new PubSubWebSocketOutputOperator<Object>()); + wsCounterOut.setUri(uri); + + dag.addStream("QueryConversion", wsIn.outputPort, queryConverter.input); + dag.addStream("QueryProcessing", queryConverter.output, mrJobOperator.input); + dag.addStream("JobData", mrJobOperator.output, wsOut.input); + dag.addStream("MapData", mrJobOperator.mapOutput, wsMapOut.input); + dag.addStream("ReduceData", mrJobOperator.reduceOutput, wsReduceOut.input); + dag.addStream("CounterData", mrJobOperator.counterOutput, wsCounterOut.input); + } + +}
