http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/pom.xml ---------------------------------------------------------------------- diff --git a/demos/mobile/pom.xml b/demos/mobile/pom.xml deleted file mode 100644 index 1d2c66c..0000000 --- a/demos/mobile/pom.xml +++ /dev/null @@ -1,64 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <artifactId>mobile-demo</artifactId> - <packaging>jar</packaging> - - <name>Apache Apex Malhar Mobile Demo</name> - <description></description> - - <parent> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-demos</artifactId> - <version>3.7.0-SNAPSHOT</version> - </parent> - - <properties> - <skipTests>true</skipTests> - </properties> - - <dependencies> - <!-- add your dependencies here --> - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-servlet</artifactId> - <version>8.1.10.v20130312</version> - <scope>test</scope> - <type>jar</type> - </dependency> - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-websocket</artifactId> - <version>8.1.10.v20130312</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - <version>3.1</version> - <type>jar</type> - </dependency> - </dependencies> - -</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/demos/mobile/src/assemble/appPackage.xml b/demos/mobile/src/assemble/appPackage.xml deleted file mode 100644 index 4138cf2..0000000 --- a/demos/mobile/src/assemble/appPackage.xml +++ /dev/null @@ -1,59 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>appPackage</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <fileSets> - <fileSet> - <directory>${basedir}/target/</directory> - <outputDirectory>/app</outputDirectory> - <includes> - <include>${project.artifactId}-${project.version}.jar</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/target/deps</directory> - <outputDirectory>/lib</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/site/conf</directory> - <outputDirectory>/conf</outputDirectory> - <includes> - <include>*.xml</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/META-INF</directory> - <outputDirectory>/META-INF</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/app</directory> - <outputDirectory>/app</outputDirectory> - </fileSet> - </fileSets> - -</assembly> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java ---------------------------------------------------------------------- diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java deleted file mode 100644 index 30d7281..0000000 --- a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java +++ /dev/null @@ -1,172 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mobile; - -import java.net.URI; -import java.util.Arrays; -import java.util.Map; -import java.util.Random; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.lang.mutable.MutableLong; -import org.apache.commons.lang3.Range; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DAG; -import com.datatorrent.api.StatsListener; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.lib.counters.BasicCounters; -import com.datatorrent.lib.io.PubSubWebSocketInputOperator; -import com.datatorrent.lib.io.PubSubWebSocketOutputOperator; -import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner; -import com.datatorrent.lib.testbench.RandomEventGenerator; - -/** - * Mobile Demo Application: - * <p> - * This demo simulates large number of cell phones in the range of 40K to 200K - * and tracks a given cell number across cell towers. It also displays the changing locations of the cell number on a google map. - * - * This demo demonstrates the scalability feature of the Apex platform. - * It showcases the ability of the platform to scale up and down as the phone numbers generated increase and decrease respectively. - * If the tuples processed per second by the pmove operator increase beyond 30,000, more partitions of the pmove operator gets deployed until - * each of the partition processes around 10000 to 30000 tuples per second. - * If the tuples processed per second drops below 10,000, the platform merges the operators until the partition count drops down to the original. - * The load can be varied using the tuplesBlast property. - * If the tuplesBlast is set to 200, 40K cell phones are generated. - * If the tuplesBlast is set to 1000, 200K cell phones are generated. - * The tuplesBlast property can be set using dtcli command: 'set-operator-property pmove tuplesBlast 1000'. - * - * - * The specs are as such<br> - * Depending on the tuplesBlast property, large number of cell phone numbers are generated. - * They jump a cell tower frequently. Sometimes - * within a second sometimes in 10 seconds. The aim is to demonstrate the - * following abilities<br> - * <ul> - * <li>Entering query dynamically: The phone numbers are added to locate its gps - * in run time.</li> - * <li>Changing functionality dynamically: The load is changed by making - * functional changes on the load generator operator (phonegen)(</li> - * <li>Auto Scale up/Down with load: Operator pmove increases and decreases - * partitions as per load</li> - * <li></li> - * </ul> - * - * Refer to demos/docs/MobileDemo.md for more information. - * - * <p> - * - * Running Java Test or Main app in IDE: - * - * <pre> - * LocalMode.runApp(new Application(), 600000); // 10 min run - * </pre> - * - * Run Success : <br> - * For successful deployment and run, user should see following output on - * console: <br> - * - * <pre> - * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3} - * phoneLocationQueryResult: {phone=5554995, location=(10,4), queryId=q1} - * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3} - * phoneLocationQueryResult: {phone=5554995, location=(10,4), queryId=q1} - * phoneLocationQueryResult: {phone=5554995, location=(10,5), queryId=q1} - * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3} - * phoneLocationQueryResult: {phone=5554995, location=(9,5), queryId=q1} - * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3} - * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3} - * phoneLocationQueryResult: {phone=5554995, location=(9,5), queryId=q1} - * phoneLocationQueryResult: {phone=5554995, location=(9,5), queryId=q1} - * phoneLocationQueryResult: {phone=5556101, location=(5,9), queryId=q3} - * </pre> - * - * * <b>Application DAG : </b><br> - * <img src="doc-files/mobile.png" width=600px > <br> - * - * @since 0.3.2 - */ -@ApplicationAnnotation(name = "MobileDemo") -public class Application implements StreamingApplication -{ - public static final String PHONE_RANGE_PROP = "dt.application.MobileDemo.phoneRange"; - public static final String TOTAL_SEED_NOS = "dt.application.MobileDemo.totalSeedNumbers"; - public static final String COOL_DOWN_MILLIS = "dt.application.MobileDemo.coolDownMillis"; - public static final String MAX_THROUGHPUT = "dt.application.MobileDemo.maxThroughput"; - public static final String MIN_THROUGHPUT = "dt.application.MobileDemo.minThroughput"; - private static final Logger LOG = LoggerFactory.getLogger(Application.class); - private Range<Integer> phoneRange = Range.between(5550000, 5559999); - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - String lPhoneRange = conf.get(PHONE_RANGE_PROP, null); - if (lPhoneRange != null) { - String[] tokens = lPhoneRange.split("-"); - if (tokens.length != 2) { - throw new IllegalArgumentException("Invalid range: " + lPhoneRange); - } - this.phoneRange = Range.between(Integer.parseInt(tokens[0]), Integer.parseInt(tokens[1])); - } - LOG.debug("Phone range {}", this.phoneRange); - - RandomEventGenerator phones = dag.addOperator("Receiver", RandomEventGenerator.class); - phones.setMinvalue(this.phoneRange.getMinimum()); - phones.setMaxvalue(this.phoneRange.getMaximum()); - - PhoneMovementGenerator movementGen = dag.addOperator("LocationFinder", PhoneMovementGenerator.class); - dag.setAttribute(movementGen, OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>()); - - StatelessThroughputBasedPartitioner<PhoneMovementGenerator> partitioner = new StatelessThroughputBasedPartitioner<PhoneMovementGenerator>(); - partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 45000)); - partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000)); - partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000)); - dag.setAttribute(movementGen, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner})); - dag.setAttribute(movementGen, OperatorContext.PARTITIONER, partitioner); - - // generate seed numbers - Random random = new Random(); - int maxPhone = phoneRange.getMaximum() - phoneRange.getMinimum(); - int phonesToDisplay = conf.getInt(TOTAL_SEED_NOS, 10); - for (int i = phonesToDisplay; i-- > 0; ) { - int phoneNo = phoneRange.getMinimum() + random.nextInt(maxPhone + 1); - LOG.info("seed no: " + phoneNo); - movementGen.phoneRegister.add(phoneNo); - } - // done generating data - LOG.info("Finished generating seed data."); - - String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); - URI uri = URI.create("ws://" + gatewayAddress + "/pubsub"); - PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator("LocationResults", new PubSubWebSocketOutputOperator<Object>()); - wsOut.setUri(uri); - PubSubWebSocketInputOperator<Map<String, String>> wsIn = dag.addOperator("QueryLocation", new PubSubWebSocketInputOperator<Map<String, String>>()); - wsIn.setUri(uri); - // default partitioning: first connected stream to movementGen will be partitioned - dag.addStream("Phone-Data", phones.integer_data, movementGen.data); - dag.addStream("Results", movementGen.locationQueryResult, wsOut.input); - dag.addStream("Query", wsIn.outputPort, movementGen.phoneQuery); - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java ---------------------------------------------------------------------- diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java deleted file mode 100644 index f6708ba..0000000 --- a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mobile; - -import java.util.Map; -import java.util.Random; -import javax.validation.constraints.Min; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Maps; -import com.google.common.collect.Range; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.InputPortFieldAnnotation; -import com.datatorrent.common.util.BaseOperator; - -/** - * Generates mobile numbers that will be displayed in mobile demo just after launch.<br></br> - * Operator attributes:<b> - * <ul> - * <li>initialDisplayCount: No. of seed phone numbers that will be generated.</li> - * <li>maxSeedPhoneNumber: The largest seed phone number.</li> - * </ul> - * </b> - * - * @since 0.3.5 - */ -public class PhoneEntryOperator extends BaseOperator -{ - private static Logger LOG = LoggerFactory.getLogger(PhoneEntryOperator.class); - - private boolean seedGenerationDone = false; - - @Min(0) - private int initialDisplayCount = 0; - - private int maxSeedPhoneNumber = 0; - private int rangeLowerEndpoint; - private int rangeUpperEndpoint; - - /** - * Sets the initial number of phones to display on the google map. - * - * @param i the count of initial phone numbers to display - */ - public void setInitialDisplayCount(int i) - { - initialDisplayCount = i; - } - - /** - * Sets the range for the phone numbers generated by the operator. - * - * @param i the range within which the phone numbers are randomly generated. - */ - public void setPhoneRange(Range<Integer> phoneRange) - { - this.rangeLowerEndpoint = phoneRange.lowerEndpoint(); - this.rangeUpperEndpoint = phoneRange.upperEndpoint(); - } - - /** - * Sets the max seed for random phone number generation - * - * @param i the number to initialize the random number phone generator. - */ - public void setMaxSeedPhoneNumber(int number) - { - this.maxSeedPhoneNumber = number; - } - - @InputPortFieldAnnotation(optional = true) - public final transient DefaultInputPort<Map<String, String>> locationQuery = new DefaultInputPort<Map<String, String>>() - { - @Override - public void process(Map<String, String> tuple) - { - seedPhones.emit(tuple); - } - }; - - public final transient DefaultOutputPort<Map<String, String>> seedPhones = new DefaultOutputPort<Map<String, String>>(); - - @Override - public void beginWindow(long windowId) - { - if (!seedGenerationDone) { - Random random = new Random(); - int maxPhone = (maxSeedPhoneNumber <= rangeUpperEndpoint && maxSeedPhoneNumber >= rangeLowerEndpoint) ? maxSeedPhoneNumber : rangeUpperEndpoint; - maxPhone -= 5550000; - int phonesToDisplay = initialDisplayCount > maxPhone ? maxPhone : initialDisplayCount; - for (int i = phonesToDisplay; i-- > 0; ) { - int phoneNo = 5550000 + random.nextInt(maxPhone + 1); - LOG.info("seed no: " + phoneNo); - Map<String, String> valueMap = Maps.newHashMap(); - valueMap.put(PhoneMovementGenerator.KEY_COMMAND, PhoneMovementGenerator.COMMAND_ADD); - valueMap.put(PhoneMovementGenerator.KEY_PHONE, Integer.toString(phoneNo)); - seedPhones.emit(valueMap); - } - // done generating data - seedGenerationDone = true; - LOG.info("Finished generating seed data."); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java ---------------------------------------------------------------------- diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java deleted file mode 100644 index a46e6d4..0000000 --- a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneMovementGenerator.java +++ /dev/null @@ -1,335 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mobile; - -import java.util.HashMap; -import java.util.Map; -import java.util.Random; -import java.util.Set; - -import javax.validation.constraints.Min; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.lang.mutable.MutableLong; - -import com.google.common.base.Strings; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.InputPortFieldAnnotation; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.lib.counters.BasicCounters; -import com.datatorrent.lib.util.HighLow; - -/** - * <p> - * This operator generates the GPS locations for the phone numbers specified. - * The range of phone numbers or a specific phone number can be set for which the GPS locations will be generated. - * It supports querying the locations of a given phone number. - * This is a partionable operator that can partition as the tuplesBlast increases. - * </p> - * - * @since 0.3.2 - */ -public class PhoneMovementGenerator extends BaseOperator -{ - public final transient DefaultInputPort<Integer> data = new DefaultInputPort<Integer>() - { - @Override - public void process(Integer tuple) - { - HighLow<Integer> loc = gps.get(tuple); - if (loc == null) { - loc = new HighLow<Integer>(random.nextInt(range), random.nextInt(range)); - gps.put(tuple, loc); - } - int xloc = loc.getHigh(); - int yloc = loc.getLow(); - int state = rotate % 4; - - // Compute new location - int delta = random.nextInt(100); - if (delta >= threshold) { - if (state < 2) { - xloc++; - } else { - xloc--; - } - if (xloc < 0) { - xloc += range; - } - } - delta = random.nextInt(100); - if (delta >= threshold) { - if ((state == 1) || (state == 3)) { - yloc++; - } else { - yloc--; - } - if (yloc < 0) { - yloc += range; - } - } - xloc %= range; - yloc %= range; - - // Set new location - HighLow<Integer> nloc = newgps.get(tuple); - if (nloc == null) { - newgps.put(tuple, new HighLow<Integer>(xloc, yloc)); - } else { - nloc.setHigh(xloc); - nloc.setLow(yloc); - } - rotate++; - } - }; - - @InputPortFieldAnnotation(optional = true) - public final transient DefaultInputPort<Map<String,String>> phoneQuery = new DefaultInputPort<Map<String,String>>() - { - @Override - public void process(Map<String,String> tuple) - { - LOG.info("new query {}", tuple); - String command = tuple.get(KEY_COMMAND); - if (command != null) { - if (command.equals(COMMAND_ADD)) { - commandCounters.getCounter(CommandCounters.ADD).increment(); - String phoneStr = tuple.get(KEY_PHONE); - registerPhone(phoneStr); - } else if (command.equals(COMMAND_ADD_RANGE)) { - commandCounters.getCounter(CommandCounters.ADD_RANGE).increment(); - registerPhoneRange(tuple.get(KEY_START_PHONE), tuple.get(KEY_END_PHONE)); - } else if (command.equals(COMMAND_DELETE)) { - commandCounters.getCounter(CommandCounters.DELETE).increment(); - String phoneStr = tuple.get(KEY_PHONE); - deregisterPhone(phoneStr); - } else if (command.equals(COMMAND_CLEAR)) { - commandCounters.getCounter(CommandCounters.CLEAR).increment(); - clearPhones(); - } - } - } - }; - - public static final String KEY_COMMAND = "command"; - public static final String KEY_PHONE = "phone"; - public static final String KEY_LOCATION = "location"; - public static final String KEY_REMOVED = "removed"; - public static final String KEY_START_PHONE = "startPhone"; - public static final String KEY_END_PHONE = "endPhone"; - - public static final String COMMAND_ADD = "add"; - public static final String COMMAND_ADD_RANGE = "addRange"; - public static final String COMMAND_DELETE = "del"; - public static final String COMMAND_CLEAR = "clear"; - - final Set<Integer> phoneRegister = Sets.newHashSet(); - - private final transient HashMap<Integer, HighLow<Integer>> gps = new HashMap<Integer, HighLow<Integer>>(); - private final Random random = new Random(); - private int range = 50; - private int threshold = 80; - private int rotate = 0; - - protected BasicCounters<MutableLong> commandCounters; - - private transient OperatorContext context; - private final transient HashMap<Integer, HighLow<Integer>> newgps = new HashMap<Integer, HighLow<Integer>>(); - - public PhoneMovementGenerator() - { - this.commandCounters = new BasicCounters<MutableLong>(MutableLong.class); - } - - /** - * @return the range of the phone numbers - */ - @Min(0) - public int getRange() - { - return range; - } - - /** - * Sets the range of phone numbers for which the GPS locations need to be generated. - * - * @param i the range of phone numbers to set - */ - public void setRange(int i) - { - range = i; - } - - /** - * @return the threshold - */ - @Min(0) - public int getThreshold() - { - return threshold; - } - - /** - * Sets the threshold that decides how frequently the GPS locations are updated. - * - * @param i the value that decides how frequently the GPS locations change. - */ - public void setThreshold(int i) - { - threshold = i; - } - - private void registerPhone(String phoneStr) - { - // register the phone channel - if (Strings.isNullOrEmpty(phoneStr)) { - return; - } - try { - Integer phone = new Integer(phoneStr); - registerSinglePhone(phone); - } catch (NumberFormatException nfe) { - LOG.warn("Invalid no {}", phoneStr); - } - } - - private void registerPhoneRange(String startPhoneStr, String endPhoneStr) - { - if (Strings.isNullOrEmpty(startPhoneStr) || Strings.isNullOrEmpty(endPhoneStr)) { - LOG.warn("Invalid phone range {} {}", startPhoneStr, endPhoneStr); - return; - } - try { - Integer startPhone = new Integer(startPhoneStr); - Integer endPhone = new Integer(endPhoneStr); - if (endPhone < startPhone) { - LOG.warn("Invalid phone range {} {}", startPhone, endPhone); - return; - } - for (int i = startPhone; i <= endPhone; i++) { - registerSinglePhone(i); - } - } catch (NumberFormatException nfe) { - LOG.warn("Invalid phone range <{},{}>", startPhoneStr, endPhoneStr); - } - } - - private void registerSinglePhone(int phone) - { - phoneRegister.add(phone); - LOG.debug("Registered query id with phone {}", phone); - emitQueryResult(phone); - } - - private void deregisterPhone(String phoneStr) - { - if (Strings.isNullOrEmpty(phoneStr)) { - return; - } - try { - Integer phone = new Integer(phoneStr); - // remove the channel - if (phoneRegister.contains(phone)) { - phoneRegister.remove(phone); - LOG.debug("Removing query id {}", phone); - emitPhoneRemoved(phone); - } - } catch (NumberFormatException nfe) { - LOG.warn("Invalid phone {}", phoneStr); - } - } - - private void clearPhones() - { - phoneRegister.clear(); - LOG.info("Clearing phones"); - } - - public final transient DefaultOutputPort<Map<String, String>> locationQueryResult = new DefaultOutputPort<Map<String, String>>(); - - @Override - public void setup(OperatorContext context) - { - this.context = context; - commandCounters.setCounter(CommandCounters.ADD, new MutableLong()); - commandCounters.setCounter(CommandCounters.ADD_RANGE, new MutableLong()); - commandCounters.setCounter(CommandCounters.DELETE, new MutableLong()); - commandCounters.setCounter(CommandCounters.CLEAR, new MutableLong()); - } - - /** - * Emit all the data and clear the hash - */ - @Override - public void endWindow() - { - for (Map.Entry<Integer, HighLow<Integer>> e: newgps.entrySet()) { - HighLow<Integer> loc = gps.get(e.getKey()); - if (loc == null) { - gps.put(e.getKey(), e.getValue()); - } else { - loc.setHigh(e.getValue().getHigh()); - loc.setLow(e.getValue().getLow()); - } - } - boolean found = false; - for (Integer phone: phoneRegister) { - emitQueryResult( phone); - found = true; - } - if (!found) { - LOG.debug("No phone number"); - } - newgps.clear(); - context.setCounters(commandCounters); - } - - private void emitQueryResult(Integer phone) - { - HighLow<Integer> loc = gps.get(phone); - if (loc != null) { - Map<String, String> queryResult = new HashMap<String, String>(); - queryResult.put(KEY_PHONE, String.valueOf(phone)); - queryResult.put(KEY_LOCATION, loc.toString()); - locationQueryResult.emit(queryResult); - } - } - - private void emitPhoneRemoved(Integer phone) - { - Map<String,String> removedResult = Maps.newHashMap(); - removedResult.put(KEY_PHONE, String.valueOf(phone)); - removedResult.put(KEY_REMOVED,"true"); - locationQueryResult.emit(removedResult); - } - - public static enum CommandCounters - { - ADD, ADD_RANGE, DELETE, CLEAR - } - - private static final Logger LOG = LoggerFactory.getLogger(PhoneMovementGenerator.class); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/main/java/com/datatorrent/demos/mobile/doc-files/Mobile.png ---------------------------------------------------------------------- diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/doc-files/Mobile.png b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/doc-files/Mobile.png deleted file mode 100644 index a25da0d..0000000 Binary files a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/doc-files/Mobile.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/main/java/com/datatorrent/demos/mobile/package-info.java ---------------------------------------------------------------------- diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/package-info.java b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/package-info.java deleted file mode 100644 index 378e7a0..0000000 --- a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/** - * Mobile phones tracking demonstration application. - */ -package com.datatorrent.demos.mobile; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/demos/mobile/src/main/resources/META-INF/properties.xml b/demos/mobile/src/main/resources/META-INF/properties.xml deleted file mode 100644 index 247bd82..0000000 --- a/demos/mobile/src/main/resources/META-INF/properties.xml +++ /dev/null @@ -1,82 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <property> - <name>dt.application.MobileDemo.coolDownMillis</name> - <value>45000</value> - </property> - <property> - <name>dt.application.MobileDemo.maxThroughput</name> - <value>30000</value> - </property> - <property> - <name>dt.application.MobileDemo.minThroughput</name> - <value>10000</value> - </property> - <property> - <name>dt.application.MobileDemo.operator.Receiver.tuplesBlast</name> - <value>200</value> - </property> - <property> - <name>dt.application.MobileDemo.operator.Receiver.tuplesBlastIntervalMillis</name> - <value>5</value> - </property> - <property> - <name>dt.application.MobileDemo.operator.Receiver.outputport.integer_data.attr.QUEUE_CAPACITY</name> - <value>32768</value> - </property> - <property> - <name>dt.application.MobileDemo.operator.LocationFinder.range</name> - <value>20</value> - </property> - <property> - <name>dt.application.MobileDemo.operator.LocationFinder.threshold</name> - <value>80</value> - </property> - <property> - <name>dt.application.MobileDemo.operator.LocationFinder.inputport.data.attr.QUEUE_CAPACITY</name> - <value>32768</value> - </property> - <property> - <name>dt.application.MobileDemo.operator.LocationResults.prop.topic</name> - <value>demos.mobile.phoneLocationQueryResult</value> - </property> - <property> - <name>dt.application.MobileDemo.operator.QueryLocation.prop.topic</name> - <value>demos.mobile.phoneLocationQuery</value> - </property> - <property> - <name>dt.application.MobileDemo.operator.*.attr.MEMORY_MB</name> - <value>768</value> - </property> - <property> - <name>dt.application.MobileDemo.operator.*.attr.JVM_OPTIONS</name> - <value>-Xmx128m</value> - </property> - <property> - <name>dt.application.MobileDemo.operator.*.port.*.attr.BUFFER_MEMORY_MB</name> - <value>256</value> - </property> - <property> - <name>dt.application.MobileDemo.attr.MASTER_MEMORY_MB</name> - <value>1024</value> - </property> -</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/site/conf/my-app-conf1.xml ---------------------------------------------------------------------- diff --git a/demos/mobile/src/site/conf/my-app-conf1.xml b/demos/mobile/src/site/conf/my-app-conf1.xml deleted file mode 100644 index f35873b..0000000 --- a/demos/mobile/src/site/conf/my-app-conf1.xml +++ /dev/null @@ -1,27 +0,0 @@ -<?xml version="1.0" encoding="UTF-8" standalone="no"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <property> - <name>dt.attr.MASTER_MEMORY_MB</name> - <value>1024</value> - </property> -</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java b/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java deleted file mode 100644 index 87e40bf..0000000 --- a/demos/mobile/src/test/java/com/datatorrent/demos/mobile/ApplicationTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mobile; - -import java.net.URI; -import java.util.HashMap; -import java.util.Map; - -import javax.servlet.Servlet; - -import org.eclipse.jetty.server.Connector; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.LocalMode; - -import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet; -import com.datatorrent.lib.io.PubSubWebSocketInputOperator; -import com.datatorrent.lib.io.PubSubWebSocketOutputOperator; -import com.datatorrent.lib.testbench.CollectorTestSink; - -public class ApplicationTest -{ - private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class); - - public ApplicationTest() - { - } - - /** - * Test of getApplication method, of class Application. - */ - @Test - public void testGetApplication() throws Exception - { - Configuration conf = new Configuration(false); - conf.addResource("dt-site-mobile.xml"); - Server server = new Server(0); - Servlet servlet = new SamplePubSubWebSocketServlet(); - ServletHolder sh = new ServletHolder(servlet); - ServletContextHandler contextHandler = new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS); - contextHandler.addServlet(sh, "/pubsub"); - contextHandler.addServlet(sh, "/*"); - server.start(); - Connector[] connector = server.getConnectors(); - conf.set("dt.attr.GATEWAY_CONNECT_ADDRESS", "localhost:" + connector[0].getLocalPort()); - URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub"); - - PubSubWebSocketOutputOperator<Object> outputOperator = new PubSubWebSocketOutputOperator<Object>(); - outputOperator.setUri(uri); - outputOperator.setTopic(conf.get("dt.application.MobileDemo.operator.QueryLocation.topic")); - - PubSubWebSocketInputOperator<Map<String, String>> inputOperator = new PubSubWebSocketInputOperator<Map<String, String>>(); - inputOperator.setUri(uri); - inputOperator.setTopic(conf.get("dt.application.MobileDemo.operator.LocationResults.topic")); - - CollectorTestSink<Object> sink = new CollectorTestSink<Object>(); - inputOperator.outputPort.setSink(sink); - - Map<String, String> data = new HashMap<String, String>(); - data.put("command", "add"); - data.put("phone", "5559990"); - - Application app = new Application(); - LocalMode lma = LocalMode.newInstance(); - lma.prepareDAG(app, conf); - LocalMode.Controller lc = lma.getController(); - lc.setHeartbeatMonitoringEnabled(false); - lc.runAsync(); - Thread.sleep(5000); - inputOperator.setup(null); - outputOperator.setup(null); - inputOperator.activate(null); - outputOperator.beginWindow(0); - outputOperator.input.process(data); - outputOperator.endWindow(); - inputOperator.beginWindow(0); - int timeoutMillis = 5000; - while (sink.collectedTuples.size() < 5 && timeoutMillis > 0) { - inputOperator.emitTuples(); - timeoutMillis -= 20; - Thread.sleep(20); - } - inputOperator.endWindow(); - lc.shutdown(); - inputOperator.teardown(); - outputOperator.teardown(); - server.stop(); - Assert.assertTrue("size of output is 5 ", sink.collectedTuples.size() == 5); - for (Object obj : sink.collectedTuples) { - Assert.assertEquals("Expected phone number", "5559990", ((Map<String, String>)obj).get("phone")); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/test/resources/dt-site-mobile.xml ---------------------------------------------------------------------- diff --git a/demos/mobile/src/test/resources/dt-site-mobile.xml b/demos/mobile/src/test/resources/dt-site-mobile.xml deleted file mode 100644 index 1759746..0000000 --- a/demos/mobile/src/test/resources/dt-site-mobile.xml +++ /dev/null @@ -1,87 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <property> - <name>dt.application.MobileDemo.class</name> - <value>com.datatorrent.demos.mobile.Application</value> - <description>An alias for the application</description> - </property> - <!--property> - <name>dt.attr.GATEWAY_CONNECT_ADDRESS</name> - <value>localhost:19090</value> - </property--> - <property> - <name>dt.application.MobileDemo.totalSeedNumbers</name> - <value>0</value> - </property> - <property> - <name>dt.application.MobileDemo.coolDownMillis</name> - <value>45000</value> - </property> - <property> - <name>dt.application.MobileDemo.maxThroughput</name> - <value>30000</value> - </property> - <property> - <name>dt.application.MobileDemo.minThroughput</name> - <value>1</value> - </property> - <property> - <name>dt.application.MobileDemo.operator.Receiver.tuplesBlast</name> - <value>200</value> - </property> - <property> - <name>dt.application.MobileDemo.operator.Receiver.tuplesBlastIntervalMillis</name> - <value>5</value> - </property> - <property> - <name>dt.application.MobileDemo.operator.Receiver.outputport.integer_data.attr.QUEUE_CAPACITY</name> - <value>32768</value> - </property> - <property> - <name>dt.application.MobileDemo.operator.LocationFinder.range</name> - <value>20</value> - </property> - <property> - <name>dt.application.MobileDemo.operator.LocationFinder.threshold</name> - <value>80</value> - </property> - <property> - <name>dt.application.MobileDemo.operator.LocationFinder.inputport.data.attr.QUEUE_CAPACITY</name> - <value>32768</value> - </property> - <property> - <name>dt.application.MobileDemo.operator.LocationResults.topic</name> - <value>resultTopic</value> - </property> - <property> - <name>dt.application.MobileDemo.operator.QueryLocation.topic</name> - <value>queryTopic</value> - </property> - <property> - <name>dt.application.MobileDemo.operator.*.attr.MEMORY_MB</name> - <value>2048</value> - </property> - <property> - <name>dt.application.MobileDemo.attr.MASTER_MEMORY_MB</name> - <value>1024</value> - </property> -</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mobile/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/demos/mobile/src/test/resources/log4j.properties b/demos/mobile/src/test/resources/log4j.properties deleted file mode 100644 index cf0d19e..0000000 --- a/demos/mobile/src/test/resources/log4j.properties +++ /dev/null @@ -1,43 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -log4j.rootLogger=DEBUG,CONSOLE - -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.CONSOLE.threshold=${test.log.console.threshold} -test.log.console.threshold=DEBUG - -log4j.appender.RFA=org.apache.log4j.RollingFileAppender -log4j.appender.RFA.layout=org.apache.log4j.PatternLayout -log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.RFA.File=/tmp/app.log - -# to enable, add SYSLOG to rootLogger -log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender -log4j.appender.SYSLOG.syslogHost=127.0.0.1 -log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout -log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n -log4j.appender.SYSLOG.Facility=LOCAL1 - -log4j.logger.org=info -#log4j.logger.org.apache.commons.beanutils=warn -log4j.logger.com.datatorrent=debug -log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/pom.xml ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/pom.xml b/demos/mrmonitor/pom.xml deleted file mode 100644 index 9373063..0000000 --- a/demos/mrmonitor/pom.xml +++ /dev/null @@ -1,64 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <artifactId>mrmonitor</artifactId> - <packaging>jar</packaging> - - <name>Apache Apex Malhar MR Monitoring Demo</name> - <description></description> - - <parent> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-demos</artifactId> - <version>3.7.0-SNAPSHOT</version> - </parent> - - <properties> - <skipTests>true</skipTests> - </properties> - - <dependencies> - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-websocket</artifactId> - <version>8.1.10.v20130312</version> - <scope>test</scope> - <type>jar</type> - </dependency> - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-servlet</artifactId> - <version>8.1.10.v20130312</version> - <scope>test</scope> - <type>jar</type> - </dependency> - <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - <version>4.3.5</version> - <type>jar</type> - </dependency> - </dependencies> - -</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/assemble/appPackage.xml b/demos/mrmonitor/src/assemble/appPackage.xml deleted file mode 100644 index 4138cf2..0000000 --- a/demos/mrmonitor/src/assemble/appPackage.xml +++ /dev/null @@ -1,59 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>appPackage</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <fileSets> - <fileSet> - <directory>${basedir}/target/</directory> - <outputDirectory>/app</outputDirectory> - <includes> - <include>${project.artifactId}-${project.version}.jar</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/target/deps</directory> - <outputDirectory>/lib</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/site/conf</directory> - <outputDirectory>/conf</outputDirectory> - <includes> - <include>*.xml</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/META-INF</directory> - <outputDirectory>/META-INF</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/app</directory> - <outputDirectory>/app</outputDirectory> - </fileSet> - </fileSets> - -</assembly> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java deleted file mode 100644 index 5625439..0000000 --- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Application.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mrmonitor; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.DAG.Locality; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.testbench.SeedEventGenerator; - -/** - * Application - * - * @since 2.0.0 - */ -@ApplicationAnnotation(name = "MyFirstApplication") -public class Application implements StreamingApplication -{ - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - // Sample DAG with 2 operators - // Replace this code with the DAG you want to build - - SeedEventGenerator seedGen = dag.addOperator("seedGen", SeedEventGenerator.class); - seedGen.setSeedStart(1); - seedGen.setSeedEnd(10); - seedGen.addKeyData("x", 0, 10); - seedGen.addKeyData("y", 0, 100); - - ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator()); - cons.setStringFormat("hello: %s"); - - dag.addStream("seeddata", seedGen.val_list, cons.input).setLocality(Locality.CONTAINER_LOCAL); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java deleted file mode 100644 index 7930405..0000000 --- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/Constants.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mrmonitor; - -/** - * <p>Constants class.</p> - * - * @since 0.3.4 - */ -public interface Constants -{ - - public static final int MAX_NUMBER_OF_JOBS = 25; - - public static final String REDUCE_TASK_TYPE = "REDUCE"; - public static final String MAP_TASK_TYPE = "MAP"; - public static final String TASK_TYPE = "type"; - public static final String TASK_ID = "id"; - - public static final String LEAGACY_TASK_ID = "taskId"; - public static final int MAX_TASKS = 2000; - - public static final String QUERY_APP_ID = "app_id"; - public static final String QUERY_JOB_ID = "job_id"; - public static final String QUERY_HADOOP_VERSION = "hadoop_version"; - public static final String QUERY_API_VERSION = "api_version"; - public static final String QUERY_RM_PORT = "rm_port"; - public static final String QUERY_HS_PORT = "hs_port"; - public static final String QUERY_HOST_NAME = "hostname"; - public static final String QUERY_KEY_COMMAND = "command"; - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java deleted file mode 100644 index 263a1a7..0000000 --- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRJobStatusOperator.java +++ /dev/null @@ -1,623 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mrmonitor; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import org.codehaus.jettison.json.JSONArray; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Operator; -import com.datatorrent.api.Operator.IdleTimeHandler; - -import com.datatorrent.demos.mrmonitor.MRStatusObject.TaskObject; - -/** - * <p> - * MRJobStatusOperator class. - * </p> - * - * @since 0.3.4 - */ -public class MRJobStatusOperator implements Operator, IdleTimeHandler -{ - private static final Logger LOG = LoggerFactory.getLogger(MRJobStatusOperator.class); - - private static final String JOB_PREFIX = "job_"; - /** - * This outputs the meta information of the job - */ - public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>(); - /** - * This outputs the map task information of the job - */ - public final transient DefaultOutputPort<String> mapOutput = new DefaultOutputPort<String>(); - /** - * This outputs the reduce task information of the job - */ - public final transient DefaultOutputPort<String> reduceOutput = new DefaultOutputPort<String>(); - /** - * This outputs the counter information of the job - */ - public final transient DefaultOutputPort<String> counterOutput = new DefaultOutputPort<String>(); - /** - * This is time in Ms before making new request for data - */ - private transient int sleepTime = 100; - /** - * This is the number of consecutive windows of no change before the job is removed from map - */ - private int maxRetrials = 10; - /** - * The number of minutes for which the status history of map and reduce tasks is stored - */ - private int statusHistoryTime = 60; - private Map<String, MRStatusObject> jobMap = new HashMap<String, MRStatusObject>(); - /** - * This represents the maximum number of jobs the single instance of this operator is going to server at any time - */ - private int maxJobs = Constants.MAX_NUMBER_OF_JOBS; - private transient Iterator<MRStatusObject> iterator; - - /* - * each input string is a map of the following format {"app_id":<>,"hadoop_version":<>,"api_version":<>,"command":<>, - * "hostname":<>,"hs_port":<>,"rm_port":<>,"job_id":<>} - */ - public final transient DefaultInputPort<MRStatusObject> input = new DefaultInputPort<MRStatusObject>() - { - @Override - public void process(MRStatusObject mrStatusObj) - { - - if (jobMap == null) { - jobMap = new HashMap<String, MRStatusObject>(); - } - - if (jobMap.size() >= maxJobs) { - return; - } - - if ("delete".equalsIgnoreCase(mrStatusObj.getCommand())) { - removeJob(mrStatusObj.getJobId()); - JSONObject outputJsonObject = new JSONObject(); - try { - outputJsonObject.put("id", mrStatusObj.getJobId()); - outputJsonObject.put("removed", "true"); - output.emit(outputJsonObject.toString()); - } catch (JSONException e) { - LOG.warn("Error creating JSON: {}", e.getMessage()); - } - return; - } - if ("clear".equalsIgnoreCase(mrStatusObj.getCommand())) { - clearMap(); - return; - } - - if (jobMap.get(mrStatusObj.getJobId()) != null) { - mrStatusObj = jobMap.get(mrStatusObj.getJobId()); - } - if (mrStatusObj.getHadoopVersion() == 2) { - getJsonForJob(mrStatusObj); - } else if (mrStatusObj.getHadoopVersion() == 1) { - getJsonForLegacyJob(mrStatusObj); - } - mrStatusObj.setStatusHistoryCount(statusHistoryTime); - iterator = jobMap.values().iterator(); - emitHelper(mrStatusObj); - } - }; - - public int getStatusHistoryTime() - { - return statusHistoryTime; - } - - public void setStatusHistoryTime(int statusHistoryTime) - { - this.statusHistoryTime = statusHistoryTime; - if (jobMap != null && jobMap.size() > 0) { - for (Entry<String, MRStatusObject> entry : jobMap.entrySet()) { - entry.getValue().setStatusHistoryCount(statusHistoryTime); - } - } - - } - - /** - * This method gets the latest status of the job from the Resource Manager for jobs submitted on hadoop 2.x version - * - * @param statusObj - */ - private void getJsonForJob(MRStatusObject statusObj) - { - - String url = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/proxy/application_" + statusObj.getAppId() + "/ws/v1/mapreduce/jobs/job_" + statusObj.getJobId(); - String responseBody = MRUtil.getJsonForURL(url); - - JSONObject jsonObj = MRUtil.getJsonObject(responseBody); - - if (jsonObj == null) { - url = "http://" + statusObj.getUri() + ":" + statusObj.getHistoryServerPort() + "/ws/v1/history/mapreduce/jobs/job_" + statusObj.getJobId(); - responseBody = MRUtil.getJsonForURL(url); - jsonObj = MRUtil.getJsonObject(responseBody); - } - - if (jsonObj != null) { - if (jobMap.get(statusObj.getJobId()) != null) { - MRStatusObject tempObj = jobMap.get(statusObj.getJobId()); - if (tempObj.getJsonObject().toString().equals(jsonObj.toString())) { - getJsonsForTasks(statusObj); - getCounterInfoForJob(statusObj); - return; - } - } - statusObj.setModified(true); - statusObj.setJsonObject(jsonObj); - getCounterInfoForJob(statusObj); - getJsonsForTasks(statusObj); - jobMap.put(statusObj.getJobId(), statusObj); - } - } - - /** - * This method is used to collect the metric information about the job - * - * @param statusObj - */ - private void getCounterInfoForJob(MRStatusObject statusObj) - { - String url = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/proxy/application_" + statusObj.getAppId() + "/ws/v1/mapreduce/jobs/job_" + statusObj.getJobId() + "/counters"; - String responseBody = MRUtil.getJsonForURL(url); - JSONObject jsonObj = MRUtil.getJsonObject(responseBody); - if (jsonObj == null) { - url = "http://" + statusObj.getUri() + ":" + statusObj.getHistoryServerPort() + "/ws/v1/history/mapreduce/jobs/job_" + statusObj.getJobId() + "/counters"; - responseBody = MRUtil.getJsonForURL(url); - jsonObj = MRUtil.getJsonObject(responseBody); - } - - if (jsonObj != null) { - if (statusObj.getMetricObject() == null) { - statusObj.setMetricObject(new TaskObject(jsonObj)); - } else if (!statusObj.getMetricObject().getJsonString().equalsIgnoreCase(jsonObj.toString())) { - statusObj.getMetricObject().setJson(jsonObj); - statusObj.getMetricObject().setModified(true); - } - } - } - - /** - * This method gets the latest status of the tasks for a job from the Resource Manager for jobs submitted on hadoop - * 2.x version - * - * @param statusObj - */ - private void getJsonsForTasks(MRStatusObject statusObj) - { - String url = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/proxy/application_" + statusObj.getAppId() + "/ws/v1/mapreduce/jobs/job_" + statusObj.getJobId() + "/tasks/"; - String responseBody = MRUtil.getJsonForURL(url); - - JSONObject jsonObj = MRUtil.getJsonObject(responseBody); - if (jsonObj == null) { - url = "http://" + statusObj.getUri() + ":" + statusObj.getHistoryServerPort() + "/ws/v1/history/mapreduce/jobs/job_" + statusObj.getJobId() + "/tasks/"; - responseBody = MRUtil.getJsonForURL(url); - - jsonObj = MRUtil.getJsonObject(responseBody); - } - - if (jsonObj != null) { - - try { - Map<String, TaskObject> mapTaskOject = statusObj.getMapJsonObject(); - Map<String, TaskObject> reduceTaskOject = statusObj.getReduceJsonObject(); - JSONArray taskJsonArray = jsonObj.getJSONObject("tasks").getJSONArray("task"); - - for (int i = 0; i < taskJsonArray.length(); i++) { - JSONObject taskObj = taskJsonArray.getJSONObject(i); - if (Constants.REDUCE_TASK_TYPE.equalsIgnoreCase(taskObj.getString(Constants.TASK_TYPE))) { - if (reduceTaskOject.get(taskObj.getString(Constants.TASK_ID)) != null) { - TaskObject tempTaskObj = reduceTaskOject.get(taskObj.getString(Constants.TASK_ID)); - if (tempTaskObj.getJsonString().equals(taskObj.toString())) { - continue; - } - tempTaskObj.setJson(taskObj); - tempTaskObj.setModified(true); - reduceTaskOject.put(taskObj.getString(Constants.TASK_ID), tempTaskObj); - continue; - } - reduceTaskOject.put(taskObj.getString(Constants.TASK_ID), new TaskObject(taskObj)); - } else { - if (mapTaskOject.get(taskObj.getString(Constants.TASK_ID)) != null) { - TaskObject tempTaskObj = mapTaskOject.get(taskObj.getString(Constants.TASK_ID)); - if (tempTaskObj.getJsonString().equals(taskObj.toString())) { - continue; - } - tempTaskObj.setJson(taskObj); - tempTaskObj.setModified(true); - mapTaskOject.put(taskObj.getString(Constants.TASK_ID), tempTaskObj); - continue; - } - mapTaskOject.put(taskObj.getString(Constants.TASK_ID), new TaskObject(taskObj)); - } - } - statusObj.setMapJsonObject(mapTaskOject); - statusObj.setReduceJsonObject(reduceTaskOject); - } catch (Exception e) { - LOG.info("exception: {}", e.getMessage()); - } - } - - } - - /** - * This method gets the latest status of the job from the Task Manager for jobs submitted on hadoop 1.x version - * - * @param statusObj - */ - private void getJsonForLegacyJob(MRStatusObject statusObj) - { - - String url = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/jobdetails.jsp?format=json&jobid=job_" + statusObj.getJobId(); - String responseBody = MRUtil.getJsonForURL(url); - - JSONObject jsonObj = MRUtil.getJsonObject(responseBody); - if (jsonObj == null) { - return; - } - - if (jobMap.get(statusObj.getJobId()) != null) { - MRStatusObject tempObj = jobMap.get(statusObj.getJobId()); - if (tempObj.getJsonObject().toString().equals(jsonObj.toString())) { - getJsonsForLegacyTasks(statusObj, "map"); - getJsonsForLegacyTasks(statusObj, "reduce"); - // output.emit(jsonObj.toString()); - // removeJob(statusObj.getJobId()); - return; - } - } - - // output.emit(jsonObj.toString()); - statusObj.setModified(true); - statusObj.setJsonObject(jsonObj); - getJsonsForLegacyTasks(statusObj, "map"); - getJsonsForLegacyTasks(statusObj, "reduce"); - jobMap.put(statusObj.getJobId(), statusObj); - - } - - /** - * This method gets the latest status of the tasks for a job from the Task Manager for jobs submitted on hadoop 1.x - * version - * - * @param statusObj - * @param type - */ - private void getJsonsForLegacyTasks(MRStatusObject statusObj, String type) - { - try { - JSONObject jobJson = statusObj.getJsonObject(); - int totalTasks = ((JSONObject)((JSONObject)jobJson.get(type + "TaskSummary")).get("taskStats")).getInt("numTotalTasks"); - Map<String, TaskObject> taskMap; - if (type.equalsIgnoreCase("map")) { - taskMap = statusObj.getMapJsonObject(); - } else { - taskMap = statusObj.getReduceJsonObject(); - } - - int totalPagenums = (totalTasks / Constants.MAX_TASKS) + 1; - String baseUrl = "http://" + statusObj.getUri() + ":" + statusObj.getRmPort() + "/jobtasks.jsp?type=" + type + "&format=json&jobid=job_" + statusObj.getJobId() + "&pagenum="; - - for (int pagenum = 1; pagenum <= totalPagenums; pagenum++) { - - String url = baseUrl + pagenum; - String responseBody = MRUtil.getJsonForURL(url); - - JSONObject jsonObj = MRUtil.getJsonObject(responseBody); - if (jsonObj == null) { - return; - } - - JSONArray taskJsonArray = jsonObj.getJSONArray("tasksInfo"); - - for (int i = 0; i < taskJsonArray.length(); i++) { - JSONObject taskObj = taskJsonArray.getJSONObject(i); - { - if (taskMap.get(taskObj.getString(Constants.LEAGACY_TASK_ID)) != null) { - TaskObject tempReduceObj = taskMap.get(taskObj.getString(Constants.LEAGACY_TASK_ID)); - if (tempReduceObj.getJsonString().equals(taskObj.toString())) { - // tempReduceObj.setModified(false); - // taskMap.put(taskObj.getString(Constants.TASK_ID), tempReduceObj); - continue; - } - tempReduceObj.setJson(taskObj); - tempReduceObj.setModified(true); - taskMap.put(taskObj.getString(Constants.TASK_ID), tempReduceObj); - continue; - - } - taskMap.put(taskObj.getString(Constants.LEAGACY_TASK_ID), new TaskObject(taskObj)); - } - } - } - - if (type.equalsIgnoreCase("map")) { - statusObj.setMapJsonObject(taskMap); - } else { - statusObj.setReduceJsonObject(taskMap); - } - } catch (Exception e) { - LOG.info(e.getMessage()); - } - - } - - @Override - public void handleIdleTime() - { - try { - Thread.sleep(sleepTime);// - } catch (InterruptedException ie) { - // If this thread was intrrupted by nother thread - } - if (!iterator.hasNext()) { - iterator = jobMap.values().iterator(); - } - - if (iterator.hasNext()) { - MRStatusObject obj = iterator.next(); - if (obj.getHadoopVersion() == 2) { - getJsonForJob(obj); - } else if (obj.getHadoopVersion() == 1) { - getJsonForLegacyJob(obj); - } - } - } - - @Override - public void setup(OperatorContext context) - { - iterator = jobMap.values().iterator(); - sleepTime = context.getValue(OperatorContext.SPIN_MILLIS); - } - - @Override - public void teardown() - { - } - - @Override - public void beginWindow(long arg0) - { - } - - private void emitHelper(MRStatusObject obj) - { - try { - obj.setModified(false); - output.emit(obj.getJsonObject().toString()); - JSONObject outputJsonObject = new JSONObject(); - - outputJsonObject.put("id", JOB_PREFIX + obj.getJobId()); - outputJsonObject.put("mapHistory", new JSONArray(obj.getMapStatusHistory())); - outputJsonObject.put("reduceHistory", new JSONArray(obj.getReduceStatusHistory())); - outputJsonObject.put("physicalMemoryHistory", new JSONArray(obj.getPhysicalMemeoryStatusHistory())); - outputJsonObject.put("virtualMemoryHistory", new JSONArray(obj.getVirtualMemoryStatusHistory())); - outputJsonObject.put("cpuHistory", new JSONArray(obj.getCpuStatusHistory())); - output.emit(outputJsonObject.toString()); - obj.setChangedHistoryStatus(false); - - outputJsonObject = new JSONObject(); - outputJsonObject.put("id", JOB_PREFIX + obj.getJobId()); - JSONArray arr = new JSONArray(); - - for (Map.Entry<String, TaskObject> mapEntry : obj.getMapJsonObject().entrySet()) { - TaskObject json = mapEntry.getValue(); - json.setModified(false); - arr.put(json.getJson()); - } - - outputJsonObject.put("tasks", arr); - mapOutput.emit(outputJsonObject.toString()); - - outputJsonObject = new JSONObject(); - outputJsonObject.put("id", JOB_PREFIX + obj.getJobId()); - arr = new JSONArray(); - - for (Map.Entry<String, TaskObject> mapEntry : obj.getReduceJsonObject().entrySet()) { - TaskObject json = mapEntry.getValue(); - json.setModified(false); - arr.put(json.getJson()); - } - - outputJsonObject.put("tasks", arr); - reduceOutput.emit(outputJsonObject.toString()); - obj.setRetrials(0); - } catch (Exception e) { - LOG.warn("error creating json {}", e.getMessage()); - } - - } - - @Override - public void endWindow() - { - List<String> delList = new ArrayList<String>(); - try { - for (Map.Entry<String, MRStatusObject> entry : jobMap.entrySet()) { - MRStatusObject obj = entry.getValue(); - - JSONObject outputJsonObject = new JSONObject(); - outputJsonObject.put("id", JOB_PREFIX + obj.getJobId()); - - boolean modified = false; - - if (obj.isModified()) { - modified = true; - obj.setModified(false); - output.emit(obj.getJsonObject().toString()); - if (obj.isChangedHistoryStatus()) { - outputJsonObject.put("mapHistory", new JSONArray(obj.getMapStatusHistory())); - outputJsonObject.put("reduceHistory", new JSONArray(obj.getReduceStatusHistory())); - outputJsonObject.put("physicalMemoryHistory", new JSONArray(obj.getPhysicalMemeoryStatusHistory())); - outputJsonObject.put("virtualMemoryHistory", new JSONArray(obj.getVirtualMemoryStatusHistory())); - outputJsonObject.put("cpuHistory", new JSONArray(obj.getCpuStatusHistory())); - output.emit(outputJsonObject.toString()); - obj.setChangedHistoryStatus(false); - } - } - outputJsonObject = new JSONObject(); - outputJsonObject.put("id", JOB_PREFIX + obj.getJobId()); - JSONArray arr = new JSONArray(); - - for (Map.Entry<String, TaskObject> mapEntry : obj.getMapJsonObject().entrySet()) { - TaskObject json = mapEntry.getValue(); - if (json.isModified()) { - modified = true; - json.setModified(false); - arr.put(json.getJson()); - } - } - - if (arr.length() > 0) { - outputJsonObject.put("tasks", arr); - mapOutput.emit(outputJsonObject.toString()); - } - - outputJsonObject = new JSONObject(); - outputJsonObject.put("id", JOB_PREFIX + obj.getJobId()); - arr = new JSONArray(); - - for (Map.Entry<String, TaskObject> mapEntry : obj.getReduceJsonObject().entrySet()) { - TaskObject json = mapEntry.getValue(); - if (json.isModified()) { - modified = true; - json.setModified(false); - arr.put(json.getJson()); - } - } - if (arr.length() > 0) { - outputJsonObject.put("tasks", arr); - reduceOutput.emit(outputJsonObject.toString()); - } - - if (obj.getMetricObject() != null && obj.getMetricObject().isModified()) { - modified = true; - obj.getMetricObject().setModified(false); - counterOutput.emit(obj.getMetricObject().getJsonString()); - } - - if (!modified) { - if (obj.getRetrials() >= maxRetrials) { - delList.add(obj.getJobId()); - } else { - obj.setRetrials(obj.getRetrials() + 1); - } - } else { - obj.setRetrials(0); - } - } - } catch (Exception ex) { - LOG.warn("error creating json {}", ex.getMessage()); - } - - if (!delList.isEmpty()) { - Iterator<String> itr = delList.iterator(); - while (itr.hasNext()) { - removeJob(itr.next()); - } - } - - } - - /** - * This method removes the job from the map - * - * @param jobId - */ - public void removeJob(String jobId) - { - if (jobMap != null) { - jobMap.remove(jobId); - iterator = jobMap.values().iterator(); - } - } - - /** - * This method clears the job map - */ - public void clearMap() - { - if (jobMap != null) { - jobMap.clear(); - iterator = jobMap.values().iterator(); - } - } - - /** - * This returns the maximum number of jobs the single instance of this operator is going to server at any time - * - * @return - */ - public int getMaxJobs() - { - return maxJobs; - } - - /** - * This sets the maximum number of jobs the single instance of this operator is going to server at any time - * - * @param maxJobs - */ - public void setMaxJobs(int maxJobs) - { - this.maxJobs = maxJobs; - } - - /** - * This sets the number of consecutive windows of no change before the job is removed from map - * - * @return - */ - public int getMaxRetrials() - { - return maxRetrials; - } - - /** - * This returns the number of consecutive windows of no change before the job is removed from map - * - * @param maxRetrials - */ - public void setMaxRetrials(int maxRetrials) - { - this.maxRetrials = maxRetrials; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java deleted file mode 100644 index 037378a..0000000 --- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRMonitoringApplication.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mrmonitor; - -import java.net.URI; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; - -import com.datatorrent.lib.io.PubSubWebSocketInputOperator; -import com.datatorrent.lib.io.PubSubWebSocketOutputOperator; - -/** - * <p> - * MRDebuggerApplication class. - * </p> - * - * @since 0.3.4 - */ -@ApplicationAnnotation(name = "MRMonitoringDemo") -public class MRMonitoringApplication implements StreamingApplication -{ - - private static final Logger logger = LoggerFactory.getLogger(MRMonitoringApplication.class); - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - String daemonAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); - MRJobStatusOperator mrJobOperator = dag.addOperator("JobMonitor", new MRJobStatusOperator()); - URI uri = URI.create("ws://" + daemonAddress + "/pubsub"); - logger.info("WebSocket with daemon at {}", daemonAddress); - - PubSubWebSocketInputOperator wsIn = dag.addOperator("Query", new PubSubWebSocketInputOperator()); - wsIn.setUri(uri); - - MapToMRObjectOperator queryConverter = dag.addOperator("QueryConverter", new MapToMRObjectOperator()); - - /** - * This is used to emit the meta data about the job - */ - PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator("JobOutput", new PubSubWebSocketOutputOperator<Object>()); - wsOut.setUri(uri); - - /** - * This is used to emit the information of map tasks of the job - */ - PubSubWebSocketOutputOperator<Object> wsMapOut = dag.addOperator("MapJob", new PubSubWebSocketOutputOperator<Object>()); - wsMapOut.setUri(uri); - - /** - * This is used to emit the information of reduce tasks of the job - */ - PubSubWebSocketOutputOperator<Object> wsReduceOut = dag.addOperator("ReduceJob", new PubSubWebSocketOutputOperator<Object>()); - wsReduceOut.setUri(uri); - - /** - * This is used to emit the metric information of the job - */ - PubSubWebSocketOutputOperator<Object> wsCounterOut = dag.addOperator("JobCounter", new PubSubWebSocketOutputOperator<Object>()); - wsCounterOut.setUri(uri); - - dag.addStream("QueryConversion", wsIn.outputPort, queryConverter.input); - dag.addStream("QueryProcessing", queryConverter.output, mrJobOperator.input); - dag.addStream("JobData", mrJobOperator.output, wsOut.input); - dag.addStream("MapData", mrJobOperator.mapOutput, wsMapOut.input); - dag.addStream("ReduceData", mrJobOperator.reduceOutput, wsReduceOut.input); - dag.addStream("CounterData", mrJobOperator.counterOutput, wsCounterOut.input); - } - -}
