http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java deleted file mode 100644 index 481f3dc..0000000 --- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java +++ /dev/null @@ -1,501 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mrmonitor; - -import java.util.LinkedList; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.codehaus.jettison.json.JSONArray; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * <p> - * MRStatusObject class. - * </p> - * - * @since 0.3.4 - */ -public class MRStatusObject -{ - private String command; - /** - * This stores the Resource Manager/ Task Manager's host information - */ - private String uri; - /** - * This field stores the job id - */ - private String jobId; - /** - * This field stores the api version of the rest apis - */ - private String apiVersion; - /** - * This field stores the hadoop version 1 for 1.x and 2 for 2.x - */ - private int hadoopVersion; - /** - * This field stores the app id for the hadoop 2.x - */ - private String appId; - /** - * This field stores the RM port information for hadoop 2.x / Task Manager server port for hadoop 1.X from where we - * can get the job information - */ - private int rmPort; - /** - * This field stores the history server information for hadoop 2.x from where we can get the job information - */ - private int historyServerPort; - /** - * This field stores the job information as json object - */ - private JSONObject jsonObject; - /** - * This field tells if the object has been modified - */ - private boolean modified; - /** - * This stores the mapping of map task ids to the TaskObject - */ - private Map<String, TaskObject> mapJsonObject; - /** - * This stores the mapping of reduce task ids to the TaskObject - */ - private Map<String, TaskObject> reduceJsonObject; - /** - * This holds the information about the various metrics like MAP_OUTPUT_RECORDS etc for this job - */ - private TaskObject metricObject; - - /** - * This holds the number of windows occurred when the new data was retrieved for this job - */ - private int retrials; - - /** - * The scheduler is used to store the job status every 1 minute - */ - private transient ScheduledExecutorService statusScheduler; - - /** - * This stores the progress of the map tasks - */ - Queue<String> mapStatusHistory; - - /** - * This stores the progress of the reduce tasks - */ - Queue<String> reduceStatusHistory; - - /** - * This stores the history of the physical memory usage - */ - Queue<String> physicalMemoryStatusHistory; - - /** - * This stores the history of the virtual memory usage - */ - Queue<String> virtualMemoryStatusHistory; - - /** - * This stores the history of the cpu - */ - Queue<String> cpuStatusHistory; - - /** - * The number of minutes for which the status history of map and reduce tasks is stored - */ - private int statusHistoryCount = 60; - - /** - * This field notifies if history status queues have changed over time - */ - private boolean changedHistoryStatus; - - public MRStatusObject() - { - retrials = 0; - modified = true; - mapJsonObject = new ConcurrentHashMap<String, TaskObject>(); - reduceJsonObject = new ConcurrentHashMap<String, TaskObject>(); - mapStatusHistory = new LinkedList<String>(); - reduceStatusHistory = new LinkedList<String>(); - physicalMemoryStatusHistory = new LinkedList<String>(); - virtualMemoryStatusHistory = new LinkedList<String>(); - cpuStatusHistory = new LinkedList<String>(); - statusScheduler = Executors.newScheduledThreadPool(1); - statusScheduler.scheduleAtFixedRate(new Runnable() - { - @Override - public void run() - { - if (jsonObject != null) { - changedHistoryStatus = true; - if (mapStatusHistory.size() > statusHistoryCount) { - mapStatusHistory.poll(); - reduceStatusHistory.poll(); - physicalMemoryStatusHistory.poll(); - virtualMemoryStatusHistory.poll(); - cpuStatusHistory.poll(); - } - if (hadoopVersion == 2) { - try { - mapStatusHistory.add(jsonObject.getJSONObject("job").getString("mapProgress")); - reduceStatusHistory.add(jsonObject.getJSONObject("job").getString("reduceProgress")); - if (metricObject.getJson() != null) { - JSONArray arr = metricObject.getJson().getJSONObject("jobCounters").getJSONArray("counterGroup"); - int length = arr.length(); - for (int i = 0; i < length; i++) { - if (arr.getJSONObject(i).get("counterGroupName").equals("org.apache.hadoop.mapreduce.TaskCounter")) { - JSONArray counters = arr.getJSONObject(i).getJSONArray("counter"); - for (int j = 0; j < counters.length(); j++) { - JSONObject counterObj = counters.getJSONObject(j); - if (counterObj.get("name").equals("PHYSICAL_MEMORY_BYTES")) { - physicalMemoryStatusHistory.add(counterObj.getString("totalCounterValue")); - } else if (counterObj.get("name").equals("VIRTUAL_MEMORY_BYTES")) { - virtualMemoryStatusHistory.add(counterObj.getString("totalCounterValue")); - } else if (counterObj.get("name").equals("CPU_MILLISECONDS")) { - cpuStatusHistory.add(counterObj.getString("totalCounterValue")); - } - } - break; - } - } - } - } catch (JSONException e) { - logger.error("error setting status history {}", e.getMessage()); - } - } else { - try { - mapStatusHistory.add(jsonObject.getJSONObject("mapTaskSummary").getString("progressPercentage")); - reduceStatusHistory.add(jsonObject.getJSONObject("reduceTaskSummary").getString("progressPercentage")); - JSONArray arr = jsonObject.getJSONArray("jobCounterGroupsInfo"); - int length = arr.length(); - for (int i = 0; i < length; i++) { - if (arr.getJSONObject(i).get("groupName").equals("Map-Reduce Framework")) { - JSONArray counters = arr.getJSONObject(i).getJSONArray("jobCountersInfo"); - for (int j = 0; j < counters.length(); j++) { - JSONObject counterObj = counters.getJSONObject(j); - if (counterObj.get("name").equals("Physical memory (bytes) snapshot")) { - physicalMemoryStatusHistory.add(counterObj.getString("totalValue")); - } else if (counterObj.get("name").equals("Virtual memory (bytes) snapshot")) { - virtualMemoryStatusHistory.add(counterObj.getString("totalValue")); - } else if (counterObj.get("name").equals("CPU time spent (ms)")) { - cpuStatusHistory.add(counterObj.getString("totalValue")); - } - } - break; - } - } - } catch (JSONException e) { - logger.error("error setting status history {}", e.getMessage()); - } - } - } - } - }, 0, 1, TimeUnit.MINUTES); - } - - public Map<String, TaskObject> getMapJsonObject() - { - return mapJsonObject; - } - - public void setMapJsonObject(Map<String, TaskObject> mapJsonObject) - { - this.mapJsonObject = mapJsonObject; - } - - public Map<String, TaskObject> getReduceJsonObject() - { - return reduceJsonObject; - } - - public void setReduceJsonObject(Map<String, TaskObject> reduceJsonObject) - { - this.reduceJsonObject = reduceJsonObject; - } - - public String getUri() - { - return uri; - } - - public void setUri(String uri) - { - this.uri = uri; - } - - public String getJobId() - { - return jobId; - } - - public void setJobId(String jobId) - { - this.jobId = jobId; - } - - public String getApiVersion() - { - return apiVersion; - } - - public void setApiVersion(String apiVersion) - { - this.apiVersion = apiVersion; - } - - public int getHadoopVersion() - { - return hadoopVersion; - } - - public void setHadoopVersion(int hadoopVersion) - { - this.hadoopVersion = hadoopVersion; - } - - public String getAppId() - { - return appId; - } - - public void setAppId(String appId) - { - this.appId = appId; - } - - public int getRmPort() - { - return rmPort; - } - - public void setRmPort(int rmPort) - { - this.rmPort = rmPort; - } - - public int getHistoryServerPort() - { - return historyServerPort; - } - - public void setHistoryServerPort(int historyServerPort) - { - this.historyServerPort = historyServerPort; - } - - public JSONObject getJsonObject() - { - return jsonObject; - } - - public void setJsonObject(JSONObject jsonObject) - { - this.jsonObject = jsonObject; - } - - public boolean isChangedHistoryStatus() - { - return changedHistoryStatus; - } - - public void setChangedHistoryStatus(boolean changedHistoryStatus) - { - this.changedHistoryStatus = changedHistoryStatus; - } - - @Override - public boolean equals(Object that) - { - if (this == that) { - return true; - } - if (!(that instanceof MRStatusObject)) { - return false; - } - if (this.hashCode() == that.hashCode()) { - return true; - } - return false; - } - - @Override - public int hashCode() - { - return (uri + jobId + apiVersion + String.valueOf(hadoopVersion)).hashCode(); - - } - - public String getCommand() - { - return command; - } - - public void setCommand(String command) - { - this.command = command; - } - - public boolean isModified() - { - return modified; - } - - public void setModified(boolean modified) - { - this.modified = modified; - } - - public int getRetrials() - { - return retrials; - } - - public void setRetrials(int retrials) - { - this.retrials = retrials; - } - - public TaskObject getMetricObject() - { - return metricObject; - } - - public void setMetricObject(TaskObject metricObject) - { - this.metricObject = metricObject; - } - - public int getStatusHistoryCount() - { - return statusHistoryCount; - } - - public void setStatusHistoryCount(int statusHistoryCount) - { - this.statusHistoryCount = statusHistoryCount; - } - - public Queue<String> getMapStatusHistory() - { - return mapStatusHistory; - } - - public Queue<String> getReduceStatusHistory() - { - return reduceStatusHistory; - } - - public Queue<String> getPhysicalMemeoryStatusHistory() - { - return physicalMemoryStatusHistory; - } - - public Queue<String> getVirtualMemoryStatusHistory() - { - return virtualMemoryStatusHistory; - } - - public Queue<String> getCpuStatusHistory() - { - return cpuStatusHistory; - } - - public static class TaskObject - { - /** - * This field stores the task information as json - */ - private JSONObject json; - /** - * This field tells if the object was modified - */ - private boolean modified; - - public TaskObject(JSONObject json) - { - modified = true; - this.json = json; - } - - /** - * This returns the task information as json - * - * @return - */ - public JSONObject getJson() - { - return json; - } - - /** - * This stores the task information as json - * - * @param json - */ - public void setJson(JSONObject json) - { - this.json = json; - } - - /** - * This returns if the json object has been modified - * - * @return - */ - public boolean isModified() - { - return modified; - } - - /** - * This sets if the json object is modified - * - * @param modified - */ - public void setModified(boolean modified) - { - this.modified = modified; - } - - /** - * This returns the string format of the json object - * - * @return - */ - public String getJsonString() - { - return json.toString(); - } - } - - private static Logger logger = LoggerFactory.getLogger(MRStatusObject.class); - -}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java deleted file mode 100644 index 0d7f6af..0000000 --- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mrmonitor; - -import java.io.IOException; - -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.http.client.ClientProtocolException; -import org.apache.http.client.HttpClient; -import org.apache.http.client.ResponseHandler; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.BasicResponseHandler; -import org.apache.http.impl.client.DefaultHttpClient; - -/** - * <p> - * Util class. - * </p> - * - * @since 0.3.4 - */ -public class MRUtil -{ - - private static final Logger logger = LoggerFactory.getLogger(MRUtil.class); - - /** - * This method returns the response content for a given url - * @param url - * @return - */ - public static String getJsonForURL(String url) - { - HttpClient httpclient = new DefaultHttpClient(); - logger.debug(url); - try { - - - HttpGet httpget = new HttpGet(url); - - // Create a response handler - ResponseHandler<String> responseHandler = new BasicResponseHandler(); - String responseBody; - try { - responseBody = httpclient.execute(httpget, responseHandler); - - } catch (ClientProtocolException e) { - logger.debug(e.getMessage()); - return null; - - } catch (IOException e) { - logger.debug(e.getMessage()); - return null; - } catch (Exception e) { - logger.debug(e.getMessage()); - return null; - } - return responseBody.trim(); - } finally { - httpclient.getConnectionManager().shutdown(); - } - } - - /** - * This method returns the JSONObject for a given string - * @param json - * @return - */ - public static JSONObject getJsonObject(String json) - { - try { - JSONObject jsonObj = new JSONObject(json); - return jsonObj; - } catch (Exception e) { - logger.debug("{}", e.getMessage()); - return null; - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java deleted file mode 100644 index 5075163..0000000 --- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mrmonitor; - -import java.util.Map; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Operator; - -/** - * <p>MapToMRObjectOperator class.</p> - * - * @since 0.9.0 - */ -public class MapToMRObjectOperator implements Operator -{ - - public final transient DefaultInputPort<Map<String, String>> input = new DefaultInputPort<Map<String, String>>() - { - @Override - public void process(Map<String, String> tuple) - { - MRStatusObject mrStatusObj = new MRStatusObject(); - - for (Map.Entry<String, String> e : tuple.entrySet()) { - if (e.getKey().equals(Constants.QUERY_KEY_COMMAND)) { - mrStatusObj.setCommand(e.getValue()); - } else if (e.getKey().equals(Constants.QUERY_API_VERSION)) { - mrStatusObj.setApiVersion(e.getValue()); - } else if (e.getKey().equals(Constants.QUERY_APP_ID)) { - mrStatusObj.setAppId(e.getValue()); - } else if (e.getKey().equals(Constants.QUERY_HADOOP_VERSION)) { - mrStatusObj.setHadoopVersion(Integer.parseInt(e.getValue())); - } else if (e.getKey().equals(Constants.QUERY_HOST_NAME)) { - mrStatusObj.setUri(e.getValue()); - } else if (e.getKey().equals(Constants.QUERY_HS_PORT)) { - mrStatusObj.setHistoryServerPort(Integer.parseInt(e.getValue())); - } else if (e.getKey().equals(Constants.QUERY_JOB_ID)) { - mrStatusObj.setJobId(e.getValue()); - } else if (e.getKey().equals(Constants.QUERY_RM_PORT)) { - mrStatusObj.setRmPort(Integer.parseInt(e.getValue())); - } - } - output.emit(mrStatusObj); - - } - }; - - public final transient DefaultOutputPort<MRStatusObject> output = new DefaultOutputPort<MRStatusObject>(); - - @Override - public void setup(OperatorContext context) - { - } - - @Override - public void teardown() - { - } - - @Override - public void beginWindow(long windowId) - { - } - - @Override - public void endWindow() - { - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/main/resources/META-INF/properties.xml b/demos/mrmonitor/src/main/resources/META-INF/properties.xml deleted file mode 100644 index 66de05e..0000000 --- a/demos/mrmonitor/src/main/resources/META-INF/properties.xml +++ /dev/null @@ -1,63 +0,0 @@ -<?xml version="1.0"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <property> - <name>dt.application.MRMonitoringDemo.operator.JobMonitor.attr.PARTITIONER</name> - <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value> - </property> - <property> - <name>dt.application.MRMonitoringDemo.operator.JobMonitor.attr.APPLICATION_WINDOW_COUNT</name> - <value>4</value> - </property> - <property> - <name>dt.application.MRMonitoringDemo.operator.JobMonitor.prop.maxJobs</name> - <value>25</value> - </property> - <property> - <name>dt.application.MRMonitoringDemo.operator.JobMonitor.prop.maxJobs</name> - <value>25</value> - </property> - <property> - <name>dt.application.MRMonitoringDemo.operator.Query.prop.topic</name> - <value>contrib.summit.mrDebugger.mrDebuggerQuery</value> - </property> - <property> - <name>dt.application.MRMonitoringDemo.operator.JobOutput.prop.topic</name> - <value>contrib.summit.mrDebugger.jobResult</value> - </property> - <property> - <name>dt.application.MRMonitoringDemo.operator.MapJob.prop.topic</name> - <value>contrib.summit.mrDebugger.mapResult</value> - </property> - <property> - <name>dt.application.MRMonitoringDemo.operator.ReduceJob.prop.topic</name> - <value>contrib.summit.mrDebugger.reduceResult</value> - </property> - <property> - <name>dt.application.MRMonitoringDemo.operator.JobCounter.prop.topic</name> - <value>contrib.summit.mrDebugger.counterResult</value> - </property> - <property> - <name>dt.application.MRMonitoringDemo.stream.QueryConversion.locality</name> - <value>CONTAINER_LOCAL</value> - </property> -</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/main/resources/mrdebugger.html ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/main/resources/mrdebugger.html b/demos/mrmonitor/src/main/resources/mrdebugger.html deleted file mode 100644 index d8e6497..0000000 --- a/demos/mrmonitor/src/main/resources/mrdebugger.html +++ /dev/null @@ -1,237 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<!doctype html> -<html> -<head> -<title>Mobile Demo</title> - -<META HTTP-EQUIV="CACHE-CONTROL" CONTENT="NO-CACHE"> -<meta name="viewport" content="initial-scale=1.0, user-scalable=no"> -<meta charset="utf-8"> -<script src="http://ajax.googleapis.com/ajax/libs/jquery/1.7.1/jquery.min.js"></script> -<script src="http://ajax.googleapis.com/ajax/libs/jqueryui/1.10.3/jquery-ui.min.js"></script> -<!--script src="js/vendor/jquery/dist/jquery.js"></script--> -<style> - body { - margin: 0; - } - - .phone-input { - margin-left: 0.5em; - margin-right: 0.5em; - } -</style> - -</head> - - -<body> - - -<script> - - - -var map; -var markers = {}; - -$(function() { - - $("#query1AddButton").click(function() { - - var app_id = $("input#app_id").val(); - var hostname = $("input#hostname").val(); - var job_id = $("input#job_id").val(); - var hadoop_version= $("input#hadoop_version").val(); - var api_version = $("input#api_version").val(); - var rm_port = $("input#rm_port").val(); - var hs_port = $("input#hs_port").val(); - - var jsonData = { - command : 'add', - hostname:hostname, - app_id:app_id, - job_id:job_id, - hadoop_version:hadoop_version, - api_version:api_version, - rm_port:rm_port, - hs_port:hs_port - }; - - sendQuery(jsonData, function() { - $('#query1SubmitConfirm').html("<div id='message'></div>"); - $('#message').html("<h2>Add submitted to application!</h2>") - .append("<p>Result will appear on page shortly.</p>"); - }); - - return false; - }); - - $("#query1DeleteButton").click(function() { - - var job_id = $("input#job_id").val(); - - var jsonData = { - command : 'delete', - query : job_id - }; - - sendQuery(jsonData, function() { - $('#query1SubmitConfirm').html("<div id='message'></div>"); - $('#message').html("<h2>Add " + phone + " submitted to application!</h2>") - .append("<p>Result will appear on page shortly.</p>"); - }); - - return false; - }); - - function sendQuery(jsonData, callback) { - var ws = new WebSocket('ws://'+window.location.host+'/pubsub'); - - ws.onopen = function () { - var topic = "contrib.summit.mrDebugger.mrDebuggerQuery"; - var msg = JSON.stringify({ "type" : "publish", "topic" : topic, "data" : jsonData }); - ws.send(msg); - console.log("published to: " + topic + " data: " + msg); - ws.close(); - if (callback) callback(); - }; - - ws.onerror = function (error) { - console.log('WebSocket Error ' + error); - }; - - ws.onmessage = function (e) { - console.log('Server: ' + e.data); - }; - ws.onclose = function (e) { - console.log('close: ' , e); - }; - - } - - var ws = new WebSocket('ws://'+window.location.host+'/pubsub'); - var topic = "contrib.summit.mrDebugger.jobResult"; - - ws.onopen = function () { - var msg = JSON.stringify({ "type":"subscribe", "topic": topic}); - console.log("sending: " + msg); - ws.send(msg); - }; - - ws.onerror = function (error) { - console.log('WebSocket Error ' + error); - }; - - ws.onmessage = function (e){ - - $('#jobQueryResult').append(e.data+"\n"); - }; - - - var mapws = new WebSocket('ws://'+window.location.host+'/pubsub'); - var maptopic = "contrib.summit.mrDebugger.mapResult"; - - mapws.onopen = function () { - var msg = JSON.stringify({ "type":"subscribe", "topic": maptopic}); - console.log("sending: " + msg); - mapws.send(msg); - }; - - mapws.onerror = function (error) { - console.log('WebSocket Error ' + error); - }; - - mapws.onmessage = function (e){ - - $('#jobMapQueryResult').append(e.data+"\n"); - }; - - - var reducews = new WebSocket('ws://'+window.location.host+'/pubsub'); - var reducetopic = "contrib.summit.mrDebugger.reduceResult"; - - reducews.onopen = function () { - var msg = JSON.stringify({ "type":"subscribe", "topic": reducetopic}); - console.log("sending: " + msg); - reducews.send(msg); - }; - - reducews.onerror = function (error) { - console.log('WebSocket Error ' + error); - }; - - reducews.onmessage = function (e){ - - $('#jobReduceQueryResult').append(e.data+"\n"); - }; - - }); - - -</script> - - -<div id="query1FormDiv"> - <form name="query1" action=""> - <p> - <label for="phone" id="app_id_label">Application Id</label> - <input type="text" name="app_id" id="app_id" size="30" value="" class="phone-input" /> - </p> - <p> - <label for="phone" id="job_id_label">Job Id</label> - <input type="text" name="job_id" id="job_id" size="30" value="" class="phone-input" /> - </p> - <p> - <label for="phone" id="hostname_label">Hostname</label> - <input type="text" name="hostname" id="hostname" size="30" value="" class="phone-input" /> - </p> - <p> - <label for="phone" id="rm_port_label">RM port</label> - <input type="text" name="rm_port" id="rm_port" size="30" value="" class="phone-input" /> - </p> - <p> - <label for="phone" id="hs_port_label">History Server port</label> - <input type="text" name="hs_port" id="hs_port" size="30" value="" class="phone-input" /> - </p> - <p> - <label for="phone" id="hadoop_version_label">Hadoop Version</label> - <input type="text" name="hadoop_version" id="hadoop_version" size="30" value="" class="phone-input" /> - </p> - <p> - <label for="phone" id="api_version_label">API Version</label> - <input type="text" name="api_version" id="api_version" size="30" value="" class="phone-input" /> - </p> - <p> - <input type="submit" name="command" class="button" id="query1AddButton" value="Add" /> - <input type="submit" name="command" class="button" id="query1DeleteButton" value="Delete" /> - <input type="submit" name="command" class="button" id="query1ClearButton" value="Clear" /> - </p> - </form> - <div id="query1SubmitConfirm"></div> - <div>Job: <span id="jobQueryResult"></span></div> - <div>Map Task: <span id="jobMapQueryResult"></span></div> - <div>Reduce Job: <span id="jobReduceQueryResult"></span></div> - </div> - -</body> -</html> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/site/conf/my-app-conf1.xml ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/site/conf/my-app-conf1.xml b/demos/mrmonitor/src/site/conf/my-app-conf1.xml deleted file mode 100644 index f35873b..0000000 --- a/demos/mrmonitor/src/site/conf/my-app-conf1.xml +++ /dev/null @@ -1,27 +0,0 @@ -<?xml version="1.0" encoding="UTF-8" standalone="no"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <property> - <name>dt.attr.MASTER_MEMORY_MB</name> - <value>1024</value> - </property> -</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java b/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java deleted file mode 100644 index ad8de02..0000000 --- a/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mrmonitor; - -import javax.servlet.Servlet; - -import org.eclipse.jetty.server.Connector; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.LocalMode; -import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet; - -/** - * <p>MapReduceDebuggerApplicationTest class.</p> - * - * @since 0.3.4 - */ - -public class MrMonitoringApplicationTest -{ - - @Test - public void testApplication() throws Exception - { - Configuration conf = new Configuration(false); - conf.addResource("dt-site-monitoring.xml"); - Server server = new Server(0); - Servlet servlet = new SamplePubSubWebSocketServlet(); - ServletHolder sh = new ServletHolder(servlet); - ServletContextHandler contextHandler = new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS); - contextHandler.addServlet(sh, "/pubsub"); - contextHandler.addServlet(sh, "/*"); - server.start(); - Connector[] connector = server.getConnectors(); - conf.set("dt.attr.GATEWAY_CONNECT_ADDRESS", "localhost:" + connector[0].getLocalPort()); - - MRMonitoringApplication application = new MRMonitoringApplication(); - LocalMode lma = LocalMode.newInstance(); - lma.prepareDAG(application, conf); - LocalMode.Controller lc = lma.getController(); - lc.run(10000); - server.stop(); - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/test/resources/dt-site-monitoring.xml ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/test/resources/dt-site-monitoring.xml b/demos/mrmonitor/src/test/resources/dt-site-monitoring.xml deleted file mode 100644 index d1e12c6..0000000 --- a/demos/mrmonitor/src/test/resources/dt-site-monitoring.xml +++ /dev/null @@ -1,63 +0,0 @@ -<?xml version="1.0"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <property> - <name>dt.application.MRMonitoringDemo.operator.JobMonitor.attr.INITIAL_PARTITION_COUNT</name> - <value>1</value> - </property> - <property> - <name>dt.application.MRMonitoringDemo.operator.JobMonitor.attr.APPLICATION_WINDOW_COUNT</name> - <value>4</value> - </property> - <property> - <name>dt.application.MRMonitoringDemo.operator.JobMonitor.prop.maxJobs</name> - <value>25</value> - </property> - <property> - <name>dt.application.MRMonitoringDemo.operator.JobMonitor.prop.maxJobs</name> - <value>25</value> - </property> - <property> - <name>dt.application.MRMonitoringDemo.operator.Query.prop.topic</name> - <value>contrib.summit.mrDebugger.mrDebuggerQuery</value> - </property> - <property> - <name>dt.application.MRMonitoringDemo.operator.JobOutput.prop.topic</name> - <value>contrib.summit.mrDebugger.jobResult</value> - </property> - <property> - <name>dt.application.MRMonitoringDemo.operator.MapJob.prop.topic</name> - <value>contrib.summit.mrDebugger.mapResult</value> - </property> - <property> - <name>dt.application.MRMonitoringDemo.operator.ReduceJob.prop.topic</name> - <value>contrib.summit.mrDebugger.reduceResult</value> - </property> - <property> - <name>dt.application.MRMonitoringDemo.operator.JobCounter.prop.topic</name> - <value>contrib.summit.mrDebugger.counterResult</value> - </property> - <property> - <name>dt.application.MRMonitoringDemo.stream.QueryConversion.locality</name> - <value>CONTAINER_LOCAL</value> - </property> -</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/demos/mrmonitor/src/test/resources/log4j.properties b/demos/mrmonitor/src/test/resources/log4j.properties deleted file mode 100644 index cf0d19e..0000000 --- a/demos/mrmonitor/src/test/resources/log4j.properties +++ /dev/null @@ -1,43 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -log4j.rootLogger=DEBUG,CONSOLE - -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.CONSOLE.threshold=${test.log.console.threshold} -test.log.console.threshold=DEBUG - -log4j.appender.RFA=org.apache.log4j.RollingFileAppender -log4j.appender.RFA.layout=org.apache.log4j.PatternLayout -log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.RFA.File=/tmp/app.log - -# to enable, add SYSLOG to rootLogger -log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender -log4j.appender.SYSLOG.syslogHost=127.0.0.1 -log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout -log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n -log4j.appender.SYSLOG.Facility=LOCAL1 - -log4j.logger.org=info -#log4j.logger.org.apache.commons.beanutils=warn -log4j.logger.com.datatorrent=debug -log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/pom.xml ---------------------------------------------------------------------- diff --git a/demos/mroperator/pom.xml b/demos/mroperator/pom.xml deleted file mode 100644 index fcc30ff..0000000 --- a/demos/mroperator/pom.xml +++ /dev/null @@ -1,56 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <artifactId>mroperator</artifactId> - <packaging>jar</packaging> - - <name>Apache Apex Malhar MR Operator Demo</name> - <description></description> - - <parent> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-demos</artifactId> - <version>3.7.0-SNAPSHOT</version> - </parent> - - <properties> - <skipTests>true</skipTests> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <version>${hadoop.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - -</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/assemble/appPackage.xml b/demos/mroperator/src/assemble/appPackage.xml deleted file mode 100644 index 4138cf2..0000000 --- a/demos/mroperator/src/assemble/appPackage.xml +++ /dev/null @@ -1,59 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>appPackage</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <fileSets> - <fileSet> - <directory>${basedir}/target/</directory> - <outputDirectory>/app</outputDirectory> - <includes> - <include>${project.artifactId}-${project.version}.jar</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/target/deps</directory> - <outputDirectory>/lib</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/site/conf</directory> - <outputDirectory>/conf</outputDirectory> - <includes> - <include>*.xml</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/META-INF</directory> - <outputDirectory>/META-INF</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/app</directory> - <outputDirectory>/app</outputDirectory> - </fileSet> - </fileSets> - -</assembly> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java deleted file mode 100644 index 5dbd83f..0000000 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mroperator; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Date; - -import org.apache.hadoop.io.WritableComparable; - -/** - * <p>DateWritable class.</p> - * - * @since 0.9.0 - */ -public class DateWritable implements WritableComparable<DateWritable> -{ - private static final SimpleDateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd' T 'HH:mm:ss.SSS" ); - private Date date; - - public Date getDate() - { - return date; - } - - public void setDate( Date date ) - { - this.date = date; - } - - public void readFields( DataInput in ) throws IOException - { - date = new Date( in.readLong() ); - } - - public void write( DataOutput out ) throws IOException - { - out.writeLong( date.getTime() ); - } - - @Override - public boolean equals(Object o) - { - return toString().equals(o.toString()); - } - - @Override - public int hashCode() - { - return toString().hashCode(); - } - - public String toString() - { - return formatter.format( date); - } - - public int compareTo( DateWritable other ) - { - return date.compareTo( other.getDate() ); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java deleted file mode 100644 index c4b9c49..0000000 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mroperator; - -import com.datatorrent.lib.io.fs.AbstractSingleFileOutputOperator; -import com.datatorrent.lib.util.KeyHashValPair; - -/** - * Adapter for writing KeyHashValPair objects to HDFS - * <p> - * Serializes tuples into a HDFS file.<br/> - * </p> - * - * @param <K> Key type - * @param <V> Value type - * @since 0.9.4 - */ -public class HdfsKeyValOutputOperator<K, V> extends AbstractSingleFileOutputOperator<KeyHashValPair<K, V>> -{ - @Override - public byte[] getBytesForTuple(KeyHashValPair<K,V> t) - { - return (t.toString() + "\n").getBytes(); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java deleted file mode 100644 index 076b8ac..0000000 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mroperator; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.TextInputFormat; - -import com.datatorrent.api.annotation.ApplicationAnnotation; - -/** - * <p>InvertedIndexApplication class.</p> - * - * @since 0.9.0 - */ -@ApplicationAnnotation(name = "InvertedIndexDemo") -public class InvertedIndexApplication extends MapReduceApplication<LongWritable, Text, Text, Text> -{ - - InvertedIndexApplication() - { - setMapClass(LineIndexer.LineIndexMapper.class); - setReduceClass(LineIndexer.LineIndexReducer.class); - - setInputFormat(TextInputFormat.class); - - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java deleted file mode 100644 index e963954..0000000 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mroperator; - -import java.io.IOException; -import java.util.Iterator; -import java.util.StringTokenizer; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; - -/** - * <p>LineIndexer class.</p> - * - * @since 0.9.0 - */ -public class LineIndexer -{ - - public static class LineIndexMapper extends MapReduceBase - implements Mapper<LongWritable, Text, Text, Text> - { - private static final Text word = new Text(); - private static final Text location = new Text(); - - public void map(LongWritable key, Text val, - OutputCollector<Text, Text> output, Reporter reporter) throws IOException - { - FileSplit fileSplit = (FileSplit)reporter.getInputSplit(); - String fileName = fileSplit.getPath().getName(); - location.set(fileName); - - String line = val.toString(); - StringTokenizer itr = new StringTokenizer(line.toLowerCase()); - while (itr.hasMoreTokens()) { - word.set(itr.nextToken()); - output.collect(word, location); - } - } - } - - - - public static class LineIndexReducer extends MapReduceBase - implements Reducer<Text, Text, Text, Text> - { - public void reduce(Text key, Iterator<Text> values, - OutputCollector<Text, Text> output, Reporter reporter) throws IOException - { - boolean first = true; - StringBuilder toReturn = new StringBuilder(); - while (values.hasNext()) { - if (!first) { - toReturn.append(", "); - } - first = false; - toReturn.append(values.next().toString()); - } - - output.collect(key, new Text(toReturn.toString())); - } - } - - - /** - * The actual main() method for our program; this is the - * "driver" for the MapReduce job. - */ - public static void main(String[] args) - { - JobClient client = new JobClient(); - JobConf conf = new JobConf(LineIndexer.class); - - conf.setJobName("LineIndexer"); - - conf.setOutputKeyClass(Text.class); - conf.setOutputValueClass(Text.class); - - FileInputFormat.addInputPath(conf, new Path("input")); - FileOutputFormat.setOutputPath(conf, new Path("output")); - - conf.setMapperClass(LineIndexMapper.class); - conf.setReducerClass(LineIndexReducer.class); - - client.setConf(conf); - - try { - JobClient.runJob(conf); - } catch (Exception e) { - e.printStackTrace(); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java deleted file mode 100644 index 69ee892..0000000 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mroperator; - -import java.io.IOException; -import java.util.Calendar; -import java.util.Iterator; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.TextOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -/** - * <p>LogCountsPerHour class.</p> - * - * @since 0.9.0 - */ -public class LogCountsPerHour extends Configured implements Tool -{ - - public static class LogMapClass extends MapReduceBase - implements Mapper<LongWritable, Text, DateWritable, IntWritable> - { - private DateWritable date = new DateWritable(); - private static final IntWritable one = new IntWritable(1); - - public void map(LongWritable key, Text value, OutputCollector<DateWritable, IntWritable> output, Reporter reporter) throws IOException - { - // Get the value as a String; it is of the format: - // 111.111.111.111 - - [16/Dec/2012:05:32:50 -0500] "GET / HTTP/1.1" 200 14791 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)" - String text = value.toString(); - - // Get the date and time - int openBracket = text.indexOf('['); - int closeBracket = text.indexOf(']'); - if (openBracket != -1 && closeBracket != -1) { - // Read the date - String dateString = text.substring(text.indexOf('[') + 1, text.indexOf(']')); - - // Build a date object from a string of the form: 16/Dec/2012:05:32:50 -0500 - int index = 0; - int nextIndex = dateString.indexOf('/'); - int day = Integer.parseInt(dateString.substring(index, nextIndex)); - - index = nextIndex; - nextIndex = dateString.indexOf('/', index + 1); - String month = dateString.substring(index + 1, nextIndex); - - index = nextIndex; - nextIndex = dateString.indexOf(':', index); - int year = Integer.parseInt(dateString.substring(index + 1, nextIndex)); - - index = nextIndex; - nextIndex = dateString.indexOf(':', index + 1); - int hour = Integer.parseInt(dateString.substring(index + 1, nextIndex)); - - // Build a calendar object for this date - Calendar calendar = Calendar.getInstance(); - calendar.set(Calendar.DATE, day); - calendar.set(Calendar.YEAR, year); - calendar.set(Calendar.HOUR, hour); - calendar.set(Calendar.MINUTE, 0); - calendar.set(Calendar.SECOND, 0); - calendar.set(Calendar.MILLISECOND, 0); - - if (month.equalsIgnoreCase("dec")) { - calendar.set(Calendar.MONTH, Calendar.DECEMBER); - } else if (month.equalsIgnoreCase("nov")) { - calendar.set(Calendar.MONTH, Calendar.NOVEMBER); - } else if (month.equalsIgnoreCase("oct")) { - calendar.set(Calendar.MONTH, Calendar.OCTOBER); - } else if (month.equalsIgnoreCase("sep")) { - calendar.set(Calendar.MONTH, Calendar.SEPTEMBER); - } else if (month.equalsIgnoreCase("aug")) { - calendar.set(Calendar.MONTH, Calendar.AUGUST); - } else if (month.equalsIgnoreCase("jul")) { - calendar.set(Calendar.MONTH, Calendar.JULY); - } else if (month.equalsIgnoreCase("jun")) { - calendar.set(Calendar.MONTH, Calendar.JUNE); - } else if (month.equalsIgnoreCase("may")) { - calendar.set(Calendar.MONTH, Calendar.MAY); - } else if (month.equalsIgnoreCase("apr")) { - calendar.set(Calendar.MONTH, Calendar.APRIL); - } else if (month.equalsIgnoreCase("mar")) { - calendar.set(Calendar.MONTH, Calendar.MARCH); - } else if (month.equalsIgnoreCase("feb")) { - calendar.set(Calendar.MONTH, Calendar.FEBRUARY); - } else if (month.equalsIgnoreCase("jan")) { - calendar.set(Calendar.MONTH, Calendar.JANUARY); - } - - - // Output the date as the key and 1 as the value - date.setDate(calendar.getTime()); - output.collect(date, one); - } - } - } - - public static class LogReduce extends MapReduceBase - implements Reducer<DateWritable, IntWritable, DateWritable, IntWritable> - { - public void reduce(DateWritable key, Iterator<IntWritable> values, OutputCollector<DateWritable, IntWritable> output, Reporter reporter) throws IOException - { - // Iterate over all of the values (counts of occurrences of this word) - int count = 0; - while (values.hasNext()) { - // Add the value to our count - count += values.next().get(); - } - - // Output the word with its count (wrapped in an IntWritable) - output.collect(key, new IntWritable(count)); - } - } - - - public int run(String[] args) throws Exception - { - // Create a configuration - Configuration conf = getConf(); - - // Create a job from the default configuration that will use the WordCount class - JobConf job = new JobConf(conf, LogCountsPerHour.class); - - // Define our input path as the first command line argument and our output path as the second - Path in = new Path(args[0]); - Path out = new Path(args[1]); - - // Create File Input/Output formats for these paths (in the job) - FileInputFormat.setInputPaths(job, in); - FileOutputFormat.setOutputPath(job, out); - - // Configure the job: name, mapper, reducer, and combiner - job.setJobName("LogAveragePerHour"); - job.setMapperClass(LogMapClass.class); - job.setReducerClass(LogReduce.class); - job.setCombinerClass(LogReduce.class); - - // Configure the output - job.setOutputFormat(TextOutputFormat.class); - job.setOutputKeyClass(DateWritable.class); - job.setOutputValueClass(IntWritable.class); - - // Run the job - JobClient.runJob(job); - return 0; - } - - public static void main(String[] args) throws Exception - { - // Start the LogCountsPerHour MapReduce application - int res = ToolRunner.run(new Configuration(), new LogCountsPerHour(), args); - System.exit(res); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java deleted file mode 100644 index 2d647ed..0000000 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mroperator; - -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.TextInputFormat; - -import com.datatorrent.api.annotation.ApplicationAnnotation; - -/** - * <p>LogsCountApplication class.</p> - * - * @since 0.9.0 - */ -@ApplicationAnnotation(name = "LogsCountDemo") -public class LogsCountApplication extends MapReduceApplication<LongWritable, Text, DateWritable, IntWritable> -{ - - public void LogsCountApplication() - { - setMapClass(LogCountsPerHour.LogMapClass.class); - // setCombineClass(LogCountsPerHour.LogReduce.class); - setReduceClass(LogCountsPerHour.LogReduce.class); - setInputFormat(TextInputFormat.class); - - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java deleted file mode 100644 index 509f6ae..0000000 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java +++ /dev/null @@ -1,414 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mroperator; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import javax.validation.constraints.Min; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.serializer.Deserializer; -import org.apache.hadoop.io.serializer.SerializationFactory; -import org.apache.hadoop.io.serializer.Serializer; -import org.apache.hadoop.mapred.Counters; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.KeyValueTextInputFormat; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.TextInputFormat; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.DefaultPartition; -import com.datatorrent.api.InputOperator; -import com.datatorrent.api.Partitioner; - -import com.datatorrent.demos.mroperator.ReporterImpl.ReporterType; -import com.datatorrent.lib.util.KeyHashValPair; - -/** - * <p> - * MapOperator class. - * </p> - * - * @since 0.9.0 - */ -@SuppressWarnings({ "unchecked"}) -public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner<MapOperator<K1, V1, K2, V2>> -{ - - private static final Logger logger = LoggerFactory.getLogger(MapOperator.class); - private String dirName; - private boolean emitPartitioningCountOnce = false; - private boolean emitLastCountOnce = false; - private int operatorId; - private Class<? extends InputFormat<K1, V1>> inputFormatClass; - private transient InputFormat<K1, V1> inputFormat; - private transient InputSplit inputSplit; - private Class<? extends InputSplit> inputSplitClass; - private ByteArrayOutputStream outstream = new ByteArrayOutputStream(); - private transient RecordReader<K1, V1> reader; - private boolean emittedAll = false; - public final transient DefaultOutputPort<KeyHashValPair<Integer, Integer>> outputCount = new DefaultOutputPort<KeyHashValPair<Integer, Integer>>(); - public final transient DefaultOutputPort<KeyHashValPair<K2, V2>> output = new DefaultOutputPort<KeyHashValPair<K2, V2>>(); - private transient JobConf jobConf; - @Min(1) - private int partitionCount = 1; - - public Class<? extends InputSplit> getInputSplitClass() - { - return inputSplitClass; - } - - public void setInputSplitClass(Class<? extends InputSplit> inputSplitClass) - { - this.inputSplitClass = inputSplitClass; - } - - public Class<? extends InputFormat<K1, V1>> getInputFormatClass() - { - return inputFormatClass; - } - - public void setInputFormatClass(Class<? extends InputFormat<K1, V1>> inputFormatClass) - { - this.inputFormatClass = inputFormatClass; - } - - public String getDirName() - { - return dirName; - } - - public void setDirName(String dirName) - { - this.dirName = dirName; - } - - public int getPartitionCount() - { - return partitionCount; - } - - public void setPartitionCount(int partitionCount) - { - this.partitionCount = partitionCount; - } - - @Override - public void beginWindow(long windowId) - { - if (!emitPartitioningCountOnce) { - outputCount.emit(new KeyHashValPair<Integer, Integer>(operatorId, 1)); - emitPartitioningCountOnce = true; - } - if (reader == null) { - try { - reader = inputFormat.getRecordReader(inputSplit, new JobConf(new Configuration()), reporter); - } catch (IOException e) { - logger.info("error getting record reader {}", e.getMessage()); - } - } - } - - @Override - public void teardown() - { - - } - - @Override - public void setup(OperatorContext context) - { - if (context != null) { - operatorId = context.getId(); - } - reporter = new ReporterImpl(ReporterType.Mapper, new Counters()); - outputCollector = new OutputCollectorImpl<K2, V2>(); - Configuration conf = new Configuration(); - try { - inputFormat = inputFormatClass.newInstance(); - SerializationFactory serializationFactory = new SerializationFactory(conf); - Deserializer keyDesiralizer = serializationFactory.getDeserializer(inputSplitClass); - keyDesiralizer.open(new ByteArrayInputStream(outstream.toByteArray())); - inputSplit = (InputSplit)keyDesiralizer.deserialize(null); - ((ReporterImpl)reporter).setInputSplit(inputSplit); - reader = inputFormat.getRecordReader(inputSplit, new JobConf(conf), reporter); - } catch (Exception e) { - logger.info("failed to initialize inputformat obj {}", inputFormat); - throw new RuntimeException(e); - } - InputStream stream = null; - if (configFile != null && configFile.length() > 0) { - stream = ClassLoader.getSystemResourceAsStream("/" + configFile); - if (stream == null) { - stream = ClassLoader.getSystemResourceAsStream(configFile); - } - } - if (stream != null) { - conf.addResource(stream); - } - jobConf = new JobConf(conf); - if (mapClass != null) { - try { - mapObject = mapClass.newInstance(); - } catch (Exception e) { - logger.info("can't instantiate object {}", e.getMessage()); - } - - mapObject.configure(jobConf); - } - if (combineClass != null) { - try { - combineObject = combineClass.newInstance(); - } catch (Exception e) { - logger.info("can't instantiate object {}", e.getMessage()); - } - combineObject.configure(jobConf); - } - } - - @Override - public void emitTuples() - { - if (!emittedAll) { - try { - K1 key = reader.createKey(); - V1 val = reader.createValue(); - emittedAll = !reader.next(key, val); - if (!emittedAll) { - KeyHashValPair<K1, V1> keyValue = new KeyHashValPair<K1, V1>(key, val); - mapObject.map(keyValue.getKey(), keyValue.getValue(), outputCollector, reporter); - if (combineObject == null) { - List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList(); - for (KeyHashValPair<K2, V2> e : list) { - output.emit(e); - } - list.clear(); - } - } - } catch (IOException ex) { - logger.debug(ex.toString()); - throw new RuntimeException(ex); - } - } - } - - @Override - public void endWindow() - { - List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList(); - if (combineObject != null) { - Map<K2, List<V2>> cacheObject = new HashMap<K2, List<V2>>(); - for (KeyHashValPair<K2, V2> tuple : list) { - List<V2> cacheList = cacheObject.get(tuple.getKey()); - if (cacheList == null) { - cacheList = new ArrayList<V2>(); - cacheList.add(tuple.getValue()); - cacheObject.put(tuple.getKey(), cacheList); - } else { - cacheList.add(tuple.getValue()); - } - } - list.clear(); - OutputCollector<K2, V2> tempOutputCollector = new OutputCollectorImpl<K2, V2>(); - for (Map.Entry<K2, List<V2>> e : cacheObject.entrySet()) { - try { - combineObject.reduce(e.getKey(), e.getValue().iterator(), tempOutputCollector, reporter); - } catch (IOException e1) { - logger.info(e1.getMessage()); - } - } - list = ((OutputCollectorImpl<K2, V2>)tempOutputCollector).getList(); - for (KeyHashValPair<K2, V2> e : list) { - output.emit(e); - } - } - if (!emitLastCountOnce && emittedAll) { - outputCount.emit(new KeyHashValPair<Integer, Integer>(operatorId, -1)); - logger.info("emitting end of file {}", new KeyHashValPair<Integer, Integer>(operatorId, -1)); - emitLastCountOnce = true; - } - list.clear(); - } - - private InputSplit[] getSplits(JobConf conf, int numSplits, String path) throws Exception - { - FileInputFormat.setInputPaths(conf, new Path(path)); - if (inputFormat == null) { - inputFormat = inputFormatClass.newInstance(); - String inputFormatClassName = inputFormatClass.getName(); - if (inputFormatClassName.equals("org.apache.hadoop.mapred.TextInputFormat")) { - ((TextInputFormat)inputFormat).configure(conf); - } else if (inputFormatClassName.equals("org.apache.hadoop.mapred.KeyValueTextInputFormat")) { - ((KeyValueTextInputFormat)inputFormat).configure(conf); - } - } - return inputFormat.getSplits(conf, numSplits); - // return null; - } - - @Override - public void partitioned(Map<Integer, Partition<MapOperator<K1, V1, K2, V2>>> partitions) - { - } - - @SuppressWarnings("rawtypes") - @Override - public Collection<Partition<MapOperator<K1, V1, K2, V2>>> definePartitions(Collection<Partition<MapOperator<K1, V1, K2, V2>>> partitions, PartitioningContext context) - { - int tempPartitionCount = partitionCount; - - Collection c = partitions; - Collection<Partition<MapOperator<K1, V1, K2, V2>>> operatorPartitions = c; - Partition<MapOperator<K1, V1, K2, V2>> template; - Iterator<Partition<MapOperator<K1, V1, K2, V2>>> itr = operatorPartitions.iterator(); - template = itr.next(); - Configuration conf = new Configuration(); - SerializationFactory serializationFactory = new SerializationFactory(conf); - if (outstream.size() == 0) { - InputSplit[] splits; - try { - splits = getSplits(new JobConf(conf), tempPartitionCount, template.getPartitionedInstance().getDirName()); - } catch (Exception e1) { - logger.info(" can't get splits {}", e1.getMessage()); - throw new RuntimeException(e1); - } - Collection<Partition<MapOperator<K1, V1, K2, V2>>> operList = new ArrayList<Partition<MapOperator<K1, V1, K2, V2>>>(); - itr = operatorPartitions.iterator(); - int size = splits.length; - Serializer keySerializer = serializationFactory.getSerializer(splits[0].getClass()); - while (size > 0 && itr.hasNext()) { - Partition<MapOperator<K1, V1, K2, V2>> p = itr.next(); - MapOperator<K1, V1, K2, V2> opr = p.getPartitionedInstance(); - opr.setInputFormatClass(inputFormatClass); - opr.setMapClass(mapClass); - opr.setCombineClass(combineClass); - opr.setConfigFile(configFile); - try { - keySerializer.open(opr.getOutstream()); - keySerializer.serialize(splits[size - 1]); - opr.setInputSplitClass(splits[size - 1].getClass()); - } catch (IOException e) { - logger.info("error while serializing {}", e.getMessage()); - } - size--; - operList.add(p); - } - while (size > 0) { - MapOperator<K1, V1, K2, V2> opr = new MapOperator<K1, V1, K2, V2>(); - opr.setInputFormatClass(inputFormatClass); - opr.setMapClass(mapClass); - opr.setCombineClass(combineClass); - opr.setConfigFile(configFile); - try { - keySerializer.open(opr.getOutstream()); - keySerializer.serialize(splits[size - 1]); - opr.setInputSplitClass(splits[size - 1].getClass()); - } catch (IOException e) { - logger.info("error while serializing {}", e.getMessage()); - } - size--; - operList.add(new DefaultPartition<MapOperator<K1, V1, K2, V2>>(opr)); - } - try { - keySerializer.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - return operList; - } - return null; - } - - public ByteArrayOutputStream getOutstream() - { - return outstream; - } - - public void setOutstream(ByteArrayOutputStream outstream) - { - this.outstream = outstream; - } - - /** - * adding map code - */ - - private Class<? extends Mapper<K1, V1, K2, V2>> mapClass; - private Class<? extends Reducer<K2, V2, K2, V2>> combineClass; - - private transient Mapper<K1, V1, K2, V2> mapObject; - private transient Reducer<K2, V2, K2, V2> combineObject; - private transient Reporter reporter; - - private String configFile; - - public String getConfigFile() - { - return configFile; - } - - public void setConfigFile(String configFile) - { - this.configFile = configFile; - } - - private transient OutputCollector<K2, V2> outputCollector; - - public Class<? extends Mapper<K1, V1, K2, V2>> getMapClass() - { - return mapClass; - } - - public void setMapClass(Class<? extends Mapper<K1, V1, K2, V2>> mapClass) - { - this.mapClass = mapClass; - } - - public Class<? extends Reducer<K2, V2, K2, V2>> getCombineClass() - { - return combineClass; - } - - public void setCombineClass(Class<? extends Reducer<K2, V2, K2, V2>> combineClass) - { - this.combineClass = combineClass; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapReduceApplication.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapReduceApplication.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapReduceApplication.java deleted file mode 100644 index 9fffc3f..0000000 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapReduceApplication.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mroperator; - -import java.util.StringTokenizer; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.Reducer; -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; - - -/** - * <p> - * Abstract MapReduceApplication class. - * </p> - * - * @since 0.9.0 - */ -@ApplicationAnnotation(name = "MapReduceDemo") -public abstract class MapReduceApplication<K1, V1, K2, V2> implements StreamingApplication -{ - Class<? extends InputFormat<K1, V1>> inputFormat; - Class<? extends Mapper<K1, V1, K2, V2>> mapClass; - Class<? extends Reducer<K2, V2, K2, V2>> reduceClass; - Class<? extends Reducer<K2, V2, K2, V2>> combineClass; - - public Class<? extends Reducer<K2, V2, K2, V2>> getCombineClass() - { - return combineClass; - } - - public void setCombineClass(Class<? extends Reducer<K2, V2, K2, V2>> combineClass) - { - this.combineClass = combineClass; - } - - public void setInputFormat(Class<? extends InputFormat<K1, V1>> inputFormat) - { - this.inputFormat = inputFormat; - } - - public Class<? extends Mapper<K1, V1, K2, V2>> getMapClass() - { - return mapClass; - } - - public void setMapClass(Class<? extends Mapper<K1, V1, K2, V2>> mapClass) - { - this.mapClass = mapClass; - } - - public Class<? extends Reducer<K2, V2, K2, V2>> getReduceClass() - { - return reduceClass; - } - - public void setReduceClass(Class<? extends Reducer<K2, V2, K2, V2>> reduceClass) - { - this.reduceClass = reduceClass; - } - - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - String configurationFilePath = conf.get(this.getClass().getSimpleName() + ".configFile", ""); - - MapOperator<K1, V1, K2, V2> inputOperator = dag.addOperator("Mapper", new MapOperator<K1, V1, K2, V2>()); - inputOperator.setInputFormatClass(inputFormat); - - String configFileName = null; - if (configurationFilePath != null && !configurationFilePath.isEmpty()) { - StringTokenizer configFileTokenizer = new StringTokenizer(configurationFilePath, "/"); - configFileName = configFileTokenizer.nextToken(); - while (configFileTokenizer.hasMoreTokens()) { - configFileName = configFileTokenizer.nextToken(); - } - } - - inputOperator.setMapClass(mapClass); - inputOperator.setConfigFile(configFileName); - inputOperator.setCombineClass(combineClass); - - ReduceOperator<K2, V2, K2, V2> reduceOpr = dag.addOperator("Reducer", new ReduceOperator<K2, V2, K2, V2>()); - reduceOpr.setReduceClass(reduceClass); - reduceOpr.setConfigFile(configFileName); - - HdfsKeyValOutputOperator<K2, V2> console = dag.addOperator("Console", new HdfsKeyValOutputOperator<K2, V2>()); - - dag.addStream("Mapped-Output", inputOperator.output, reduceOpr.input); - dag.addStream("Mapper-Count", inputOperator.outputCount, reduceOpr.inputCount); - dag.addStream("Reduced-Output", reduceOpr.output, console.input); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java deleted file mode 100644 index b0ea7d8..0000000 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mroperator; - -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.TextInputFormat; - -import com.datatorrent.api.annotation.ApplicationAnnotation; - -/** - * <p>NewWordCountApplication class.</p> - * - * @since 0.9.0 - */ -@ApplicationAnnotation(name = "WordCountDemo") -public class NewWordCountApplication extends MapReduceApplication<LongWritable, Text, Text, IntWritable> -{ - - public void NewWordCountApplication() - { - setMapClass(WordCount.Map.class); - setReduceClass(WordCount.Reduce.class); - setCombineClass(WordCount.Reduce.class); - setInputFormat(TextInputFormat.class); - } -}
