http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRStatusObject.java ---------------------------------------------------------------------- diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRStatusObject.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRStatusObject.java new file mode 100644 index 0000000..ae8b551 --- /dev/null +++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRStatusObject.java @@ -0,0 +1,501 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mrmonitor; + +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p> + * MRStatusObject class. + * </p> + * + * @since 0.3.4 + */ +public class MRStatusObject +{ + private String command; + /** + * This stores the Resource Manager/ Task Manager's host information + */ + private String uri; + /** + * This field stores the job id + */ + private String jobId; + /** + * This field stores the api version of the rest apis + */ + private String apiVersion; + /** + * This field stores the hadoop version 1 for 1.x and 2 for 2.x + */ + private int hadoopVersion; + /** + * This field stores the app id for the hadoop 2.x + */ + private String appId; + /** + * This field stores the RM port information for hadoop 2.x / Task Manager server port for hadoop 1.X from where we + * can get the job information + */ + private int rmPort; + /** + * This field stores the history server information for hadoop 2.x from where we can get the job information + */ + private int historyServerPort; + /** + * This field stores the job information as json object + */ + private JSONObject jsonObject; + /** + * This field tells if the object has been modified + */ + private boolean modified; + /** + * This stores the mapping of map task ids to the TaskObject + */ + private Map<String, TaskObject> mapJsonObject; + /** + * This stores the mapping of reduce task ids to the TaskObject + */ + private Map<String, TaskObject> reduceJsonObject; + /** + * This holds the information about the various metrics like MAP_OUTPUT_RECORDS etc for this job + */ + private TaskObject metricObject; + + /** + * This holds the number of windows occurred when the new data was retrieved for this job + */ + private int retrials; + + /** + * The scheduler is used to store the job status every 1 minute + */ + private transient ScheduledExecutorService statusScheduler; + + /** + * This stores the progress of the map tasks + */ + Queue<String> mapStatusHistory; + + /** + * This stores the progress of the reduce tasks + */ + Queue<String> reduceStatusHistory; + + /** + * This stores the history of the physical memory usage + */ + Queue<String> physicalMemoryStatusHistory; + + /** + * This stores the history of the virtual memory usage + */ + Queue<String> virtualMemoryStatusHistory; + + /** + * This stores the history of the cpu + */ + Queue<String> cpuStatusHistory; + + /** + * The number of minutes for which the status history of map and reduce tasks is stored + */ + private int statusHistoryCount = 60; + + /** + * This field notifies if history status queues have changed over time + */ + private boolean changedHistoryStatus; + + public MRStatusObject() + { + retrials = 0; + modified = true; + mapJsonObject = new ConcurrentHashMap<String, TaskObject>(); + reduceJsonObject = new ConcurrentHashMap<String, TaskObject>(); + mapStatusHistory = new LinkedList<String>(); + reduceStatusHistory = new LinkedList<String>(); + physicalMemoryStatusHistory = new LinkedList<String>(); + virtualMemoryStatusHistory = new LinkedList<String>(); + cpuStatusHistory = new LinkedList<String>(); + statusScheduler = Executors.newScheduledThreadPool(1); + statusScheduler.scheduleAtFixedRate(new Runnable() + { + @Override + public void run() + { + if (jsonObject != null) { + changedHistoryStatus = true; + if (mapStatusHistory.size() > statusHistoryCount) { + mapStatusHistory.poll(); + reduceStatusHistory.poll(); + physicalMemoryStatusHistory.poll(); + virtualMemoryStatusHistory.poll(); + cpuStatusHistory.poll(); + } + if (hadoopVersion == 2) { + try { + mapStatusHistory.add(jsonObject.getJSONObject("job").getString("mapProgress")); + reduceStatusHistory.add(jsonObject.getJSONObject("job").getString("reduceProgress")); + if (metricObject.getJson() != null) { + JSONArray arr = metricObject.getJson().getJSONObject("jobCounters").getJSONArray("counterGroup"); + int length = arr.length(); + for (int i = 0; i < length; i++) { + if (arr.getJSONObject(i).get("counterGroupName").equals("org.apache.hadoop.mapreduce.TaskCounter")) { + JSONArray counters = arr.getJSONObject(i).getJSONArray("counter"); + for (int j = 0; j < counters.length(); j++) { + JSONObject counterObj = counters.getJSONObject(j); + if (counterObj.get("name").equals("PHYSICAL_MEMORY_BYTES")) { + physicalMemoryStatusHistory.add(counterObj.getString("totalCounterValue")); + } else if (counterObj.get("name").equals("VIRTUAL_MEMORY_BYTES")) { + virtualMemoryStatusHistory.add(counterObj.getString("totalCounterValue")); + } else if (counterObj.get("name").equals("CPU_MILLISECONDS")) { + cpuStatusHistory.add(counterObj.getString("totalCounterValue")); + } + } + break; + } + } + } + } catch (JSONException e) { + logger.error("error setting status history {}", e.getMessage()); + } + } else { + try { + mapStatusHistory.add(jsonObject.getJSONObject("mapTaskSummary").getString("progressPercentage")); + reduceStatusHistory.add(jsonObject.getJSONObject("reduceTaskSummary").getString("progressPercentage")); + JSONArray arr = jsonObject.getJSONArray("jobCounterGroupsInfo"); + int length = arr.length(); + for (int i = 0; i < length; i++) { + if (arr.getJSONObject(i).get("groupName").equals("Map-Reduce Framework")) { + JSONArray counters = arr.getJSONObject(i).getJSONArray("jobCountersInfo"); + for (int j = 0; j < counters.length(); j++) { + JSONObject counterObj = counters.getJSONObject(j); + if (counterObj.get("name").equals("Physical memory (bytes) snapshot")) { + physicalMemoryStatusHistory.add(counterObj.getString("totalValue")); + } else if (counterObj.get("name").equals("Virtual memory (bytes) snapshot")) { + virtualMemoryStatusHistory.add(counterObj.getString("totalValue")); + } else if (counterObj.get("name").equals("CPU time spent (ms)")) { + cpuStatusHistory.add(counterObj.getString("totalValue")); + } + } + break; + } + } + } catch (JSONException e) { + logger.error("error setting status history {}", e.getMessage()); + } + } + } + } + }, 0, 1, TimeUnit.MINUTES); + } + + public Map<String, TaskObject> getMapJsonObject() + { + return mapJsonObject; + } + + public void setMapJsonObject(Map<String, TaskObject> mapJsonObject) + { + this.mapJsonObject = mapJsonObject; + } + + public Map<String, TaskObject> getReduceJsonObject() + { + return reduceJsonObject; + } + + public void setReduceJsonObject(Map<String, TaskObject> reduceJsonObject) + { + this.reduceJsonObject = reduceJsonObject; + } + + public String getUri() + { + return uri; + } + + public void setUri(String uri) + { + this.uri = uri; + } + + public String getJobId() + { + return jobId; + } + + public void setJobId(String jobId) + { + this.jobId = jobId; + } + + public String getApiVersion() + { + return apiVersion; + } + + public void setApiVersion(String apiVersion) + { + this.apiVersion = apiVersion; + } + + public int getHadoopVersion() + { + return hadoopVersion; + } + + public void setHadoopVersion(int hadoopVersion) + { + this.hadoopVersion = hadoopVersion; + } + + public String getAppId() + { + return appId; + } + + public void setAppId(String appId) + { + this.appId = appId; + } + + public int getRmPort() + { + return rmPort; + } + + public void setRmPort(int rmPort) + { + this.rmPort = rmPort; + } + + public int getHistoryServerPort() + { + return historyServerPort; + } + + public void setHistoryServerPort(int historyServerPort) + { + this.historyServerPort = historyServerPort; + } + + public JSONObject getJsonObject() + { + return jsonObject; + } + + public void setJsonObject(JSONObject jsonObject) + { + this.jsonObject = jsonObject; + } + + public boolean isChangedHistoryStatus() + { + return changedHistoryStatus; + } + + public void setChangedHistoryStatus(boolean changedHistoryStatus) + { + this.changedHistoryStatus = changedHistoryStatus; + } + + @Override + public boolean equals(Object that) + { + if (this == that) { + return true; + } + if (!(that instanceof MRStatusObject)) { + return false; + } + if (this.hashCode() == that.hashCode()) { + return true; + } + return false; + } + + @Override + public int hashCode() + { + return (uri + jobId + apiVersion + String.valueOf(hadoopVersion)).hashCode(); + + } + + public String getCommand() + { + return command; + } + + public void setCommand(String command) + { + this.command = command; + } + + public boolean isModified() + { + return modified; + } + + public void setModified(boolean modified) + { + this.modified = modified; + } + + public int getRetrials() + { + return retrials; + } + + public void setRetrials(int retrials) + { + this.retrials = retrials; + } + + public TaskObject getMetricObject() + { + return metricObject; + } + + public void setMetricObject(TaskObject metricObject) + { + this.metricObject = metricObject; + } + + public int getStatusHistoryCount() + { + return statusHistoryCount; + } + + public void setStatusHistoryCount(int statusHistoryCount) + { + this.statusHistoryCount = statusHistoryCount; + } + + public Queue<String> getMapStatusHistory() + { + return mapStatusHistory; + } + + public Queue<String> getReduceStatusHistory() + { + return reduceStatusHistory; + } + + public Queue<String> getPhysicalMemeoryStatusHistory() + { + return physicalMemoryStatusHistory; + } + + public Queue<String> getVirtualMemoryStatusHistory() + { + return virtualMemoryStatusHistory; + } + + public Queue<String> getCpuStatusHistory() + { + return cpuStatusHistory; + } + + public static class TaskObject + { + /** + * This field stores the task information as json + */ + private JSONObject json; + /** + * This field tells if the object was modified + */ + private boolean modified; + + public TaskObject(JSONObject json) + { + modified = true; + this.json = json; + } + + /** + * This returns the task information as json + * + * @return + */ + public JSONObject getJson() + { + return json; + } + + /** + * This stores the task information as json + * + * @param json + */ + public void setJson(JSONObject json) + { + this.json = json; + } + + /** + * This returns if the json object has been modified + * + * @return + */ + public boolean isModified() + { + return modified; + } + + /** + * This sets if the json object is modified + * + * @param modified + */ + public void setModified(boolean modified) + { + this.modified = modified; + } + + /** + * This returns the string format of the json object + * + * @return + */ + public String getJsonString() + { + return json.toString(); + } + } + + private static Logger logger = LoggerFactory.getLogger(MRStatusObject.class); + +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRUtil.java ---------------------------------------------------------------------- diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRUtil.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRUtil.java new file mode 100644 index 0000000..1a82b99 --- /dev/null +++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MRUtil.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mrmonitor; + +import java.io.IOException; + +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.HttpClient; +import org.apache.http.client.ResponseHandler; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.BasicResponseHandler; +import org.apache.http.impl.client.DefaultHttpClient; + +/** + * <p> + * Util class. + * </p> + * + * @since 0.3.4 + */ +public class MRUtil +{ + + private static final Logger logger = LoggerFactory.getLogger(MRUtil.class); + + /** + * This method returns the response content for a given url + * @param url + * @return + */ + public static String getJsonForURL(String url) + { + HttpClient httpclient = new DefaultHttpClient(); + logger.debug(url); + try { + + + HttpGet httpget = new HttpGet(url); + + // Create a response handler + ResponseHandler<String> responseHandler = new BasicResponseHandler(); + String responseBody; + try { + responseBody = httpclient.execute(httpget, responseHandler); + + } catch (ClientProtocolException e) { + logger.debug(e.getMessage()); + return null; + + } catch (IOException e) { + logger.debug(e.getMessage()); + return null; + } catch (Exception e) { + logger.debug(e.getMessage()); + return null; + } + return responseBody.trim(); + } finally { + httpclient.getConnectionManager().shutdown(); + } + } + + /** + * This method returns the JSONObject for a given string + * @param json + * @return + */ + public static JSONObject getJsonObject(String json) + { + try { + JSONObject jsonObj = new JSONObject(json); + return jsonObj; + } catch (Exception e) { + logger.debug("{}", e.getMessage()); + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MapToMRObjectOperator.java ---------------------------------------------------------------------- diff --git a/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MapToMRObjectOperator.java b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MapToMRObjectOperator.java new file mode 100644 index 0000000..f4ce4fb --- /dev/null +++ b/examples/mrmonitor/src/main/java/org/apache/apex/examples/mrmonitor/MapToMRObjectOperator.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mrmonitor; + +import java.util.Map; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; + +/** + * <p>MapToMRObjectOperator class.</p> + * + * @since 0.9.0 + */ +public class MapToMRObjectOperator implements Operator +{ + + public final transient DefaultInputPort<Map<String, String>> input = new DefaultInputPort<Map<String, String>>() + { + @Override + public void process(Map<String, String> tuple) + { + MRStatusObject mrStatusObj = new MRStatusObject(); + + for (Map.Entry<String, String> e : tuple.entrySet()) { + if (e.getKey().equals(Constants.QUERY_KEY_COMMAND)) { + mrStatusObj.setCommand(e.getValue()); + } else if (e.getKey().equals(Constants.QUERY_API_VERSION)) { + mrStatusObj.setApiVersion(e.getValue()); + } else if (e.getKey().equals(Constants.QUERY_APP_ID)) { + mrStatusObj.setAppId(e.getValue()); + } else if (e.getKey().equals(Constants.QUERY_HADOOP_VERSION)) { + mrStatusObj.setHadoopVersion(Integer.parseInt(e.getValue())); + } else if (e.getKey().equals(Constants.QUERY_HOST_NAME)) { + mrStatusObj.setUri(e.getValue()); + } else if (e.getKey().equals(Constants.QUERY_HS_PORT)) { + mrStatusObj.setHistoryServerPort(Integer.parseInt(e.getValue())); + } else if (e.getKey().equals(Constants.QUERY_JOB_ID)) { + mrStatusObj.setJobId(e.getValue()); + } else if (e.getKey().equals(Constants.QUERY_RM_PORT)) { + mrStatusObj.setRmPort(Integer.parseInt(e.getValue())); + } + } + output.emit(mrStatusObj); + + } + }; + + public final transient DefaultOutputPort<MRStatusObject> output = new DefaultOutputPort<MRStatusObject>(); + + @Override + public void setup(OperatorContext context) + { + } + + @Override + public void teardown() + { + } + + @Override + public void beginWindow(long windowId) + { + } + + @Override + public void endWindow() + { + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/mrmonitor/src/main/resources/META-INF/properties.xml b/examples/mrmonitor/src/main/resources/META-INF/properties.xml new file mode 100644 index 0000000..fdda52b --- /dev/null +++ b/examples/mrmonitor/src/main/resources/META-INF/properties.xml @@ -0,0 +1,63 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <property> + <name>dt.application.MRMonitoringExample.operator.JobMonitor.attr.PARTITIONER</name> + <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value> + </property> + <property> + <name>dt.application.MRMonitoringExample.operator.JobMonitor.attr.APPLICATION_WINDOW_COUNT</name> + <value>4</value> + </property> + <property> + <name>dt.application.MRMonitoringExample.operator.JobMonitor.prop.maxJobs</name> + <value>25</value> + </property> + <property> + <name>dt.application.MRMonitoringExample.operator.JobMonitor.prop.maxJobs</name> + <value>25</value> + </property> + <property> + <name>dt.application.MRMonitoringExample.operator.Query.prop.topic</name> + <value>contrib.summit.mrDebugger.mrDebuggerQuery</value> + </property> + <property> + <name>dt.application.MRMonitoringExample.operator.JobOutput.prop.topic</name> + <value>contrib.summit.mrDebugger.jobResult</value> + </property> + <property> + <name>dt.application.MRMonitoringExample.operator.MapJob.prop.topic</name> + <value>contrib.summit.mrDebugger.mapResult</value> + </property> + <property> + <name>dt.application.MRMonitoringExample.operator.ReduceJob.prop.topic</name> + <value>contrib.summit.mrDebugger.reduceResult</value> + </property> + <property> + <name>dt.application.MRMonitoringExample.operator.JobCounter.prop.topic</name> + <value>contrib.summit.mrDebugger.counterResult</value> + </property> + <property> + <name>dt.application.MRMonitoringExample.stream.QueryConversion.locality</name> + <value>CONTAINER_LOCAL</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/main/resources/mrdebugger.html ---------------------------------------------------------------------- diff --git a/examples/mrmonitor/src/main/resources/mrdebugger.html b/examples/mrmonitor/src/main/resources/mrdebugger.html new file mode 100644 index 0000000..dddde06 --- /dev/null +++ b/examples/mrmonitor/src/main/resources/mrdebugger.html @@ -0,0 +1,237 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<!doctype html> +<html> +<head> +<title>Mobile Example</title> + +<META HTTP-EQUIV="CACHE-CONTROL" CONTENT="NO-CACHE"> +<meta name="viewport" content="initial-scale=1.0, user-scalable=no"> +<meta charset="utf-8"> +<script src="http://ajax.googleapis.com/ajax/libs/jquery/1.7.1/jquery.min.js"></script> +<script src="http://ajax.googleapis.com/ajax/libs/jqueryui/1.10.3/jquery-ui.min.js"></script> +<!--script src="js/vendor/jquery/dist/jquery.js"></script--> +<style> + body { + margin: 0; + } + + .phone-input { + margin-left: 0.5em; + margin-right: 0.5em; + } +</style> + +</head> + + +<body> + + +<script> + + + +var map; +var markers = {}; + +$(function() { + + $("#query1AddButton").click(function() { + + var app_id = $("input#app_id").val(); + var hostname = $("input#hostname").val(); + var job_id = $("input#job_id").val(); + var hadoop_version= $("input#hadoop_version").val(); + var api_version = $("input#api_version").val(); + var rm_port = $("input#rm_port").val(); + var hs_port = $("input#hs_port").val(); + + var jsonData = { + command : 'add', + hostname:hostname, + app_id:app_id, + job_id:job_id, + hadoop_version:hadoop_version, + api_version:api_version, + rm_port:rm_port, + hs_port:hs_port + }; + + sendQuery(jsonData, function() { + $('#query1SubmitConfirm').html("<div id='message'></div>"); + $('#message').html("<h2>Add submitted to application!</h2>") + .append("<p>Result will appear on page shortly.</p>"); + }); + + return false; + }); + + $("#query1DeleteButton").click(function() { + + var job_id = $("input#job_id").val(); + + var jsonData = { + command : 'delete', + query : job_id + }; + + sendQuery(jsonData, function() { + $('#query1SubmitConfirm').html("<div id='message'></div>"); + $('#message').html("<h2>Add " + phone + " submitted to application!</h2>") + .append("<p>Result will appear on page shortly.</p>"); + }); + + return false; + }); + + function sendQuery(jsonData, callback) { + var ws = new WebSocket('ws://'+window.location.host+'/pubsub'); + + ws.onopen = function () { + var topic = "contrib.summit.mrDebugger.mrDebuggerQuery"; + var msg = JSON.stringify({ "type" : "publish", "topic" : topic, "data" : jsonData }); + ws.send(msg); + console.log("published to: " + topic + " data: " + msg); + ws.close(); + if (callback) callback(); + }; + + ws.onerror = function (error) { + console.log('WebSocket Error ' + error); + }; + + ws.onmessage = function (e) { + console.log('Server: ' + e.data); + }; + ws.onclose = function (e) { + console.log('close: ' , e); + }; + + } + + var ws = new WebSocket('ws://'+window.location.host+'/pubsub'); + var topic = "contrib.summit.mrDebugger.jobResult"; + + ws.onopen = function () { + var msg = JSON.stringify({ "type":"subscribe", "topic": topic}); + console.log("sending: " + msg); + ws.send(msg); + }; + + ws.onerror = function (error) { + console.log('WebSocket Error ' + error); + }; + + ws.onmessage = function (e){ + + $('#jobQueryResult').append(e.data+"\n"); + }; + + + var mapws = new WebSocket('ws://'+window.location.host+'/pubsub'); + var maptopic = "contrib.summit.mrDebugger.mapResult"; + + mapws.onopen = function () { + var msg = JSON.stringify({ "type":"subscribe", "topic": maptopic}); + console.log("sending: " + msg); + mapws.send(msg); + }; + + mapws.onerror = function (error) { + console.log('WebSocket Error ' + error); + }; + + mapws.onmessage = function (e){ + + $('#jobMapQueryResult').append(e.data+"\n"); + }; + + + var reducews = new WebSocket('ws://'+window.location.host+'/pubsub'); + var reducetopic = "contrib.summit.mrDebugger.reduceResult"; + + reducews.onopen = function () { + var msg = JSON.stringify({ "type":"subscribe", "topic": reducetopic}); + console.log("sending: " + msg); + reducews.send(msg); + }; + + reducews.onerror = function (error) { + console.log('WebSocket Error ' + error); + }; + + reducews.onmessage = function (e){ + + $('#jobReduceQueryResult').append(e.data+"\n"); + }; + + }); + + +</script> + + +<div id="query1FormDiv"> + <form name="query1" action=""> + <p> + <label for="phone" id="app_id_label">Application Id</label> + <input type="text" name="app_id" id="app_id" size="30" value="" class="phone-input" /> + </p> + <p> + <label for="phone" id="job_id_label">Job Id</label> + <input type="text" name="job_id" id="job_id" size="30" value="" class="phone-input" /> + </p> + <p> + <label for="phone" id="hostname_label">Hostname</label> + <input type="text" name="hostname" id="hostname" size="30" value="" class="phone-input" /> + </p> + <p> + <label for="phone" id="rm_port_label">RM port</label> + <input type="text" name="rm_port" id="rm_port" size="30" value="" class="phone-input" /> + </p> + <p> + <label for="phone" id="hs_port_label">History Server port</label> + <input type="text" name="hs_port" id="hs_port" size="30" value="" class="phone-input" /> + </p> + <p> + <label for="phone" id="hadoop_version_label">Hadoop Version</label> + <input type="text" name="hadoop_version" id="hadoop_version" size="30" value="" class="phone-input" /> + </p> + <p> + <label for="phone" id="api_version_label">API Version</label> + <input type="text" name="api_version" id="api_version" size="30" value="" class="phone-input" /> + </p> + <p> + <input type="submit" name="command" class="button" id="query1AddButton" value="Add" /> + <input type="submit" name="command" class="button" id="query1DeleteButton" value="Delete" /> + <input type="submit" name="command" class="button" id="query1ClearButton" value="Clear" /> + </p> + </form> + <div id="query1SubmitConfirm"></div> + <div>Job: <span id="jobQueryResult"></span></div> + <div>Map Task: <span id="jobMapQueryResult"></span></div> + <div>Reduce Job: <span id="jobReduceQueryResult"></span></div> + </div> + +</body> +</html> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/test/java/org/apache/apex/examples/mrmonitor/MrMonitoringApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/mrmonitor/src/test/java/org/apache/apex/examples/mrmonitor/MrMonitoringApplicationTest.java b/examples/mrmonitor/src/test/java/org/apache/apex/examples/mrmonitor/MrMonitoringApplicationTest.java new file mode 100644 index 0000000..b094ddb --- /dev/null +++ b/examples/mrmonitor/src/test/java/org/apache/apex/examples/mrmonitor/MrMonitoringApplicationTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mrmonitor; + +import javax.servlet.Servlet; + +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet; + +/** + * <p>MapReduceDebuggerApplicationTest class.</p> + * + * @since 0.3.4 + */ + +public class MrMonitoringApplicationTest +{ + + @Test + public void testApplication() throws Exception + { + Configuration conf = new Configuration(false); + conf.addResource("dt-site-monitoring.xml"); + Server server = new Server(0); + Servlet servlet = new SamplePubSubWebSocketServlet(); + ServletHolder sh = new ServletHolder(servlet); + ServletContextHandler contextHandler = new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS); + contextHandler.addServlet(sh, "/pubsub"); + contextHandler.addServlet(sh, "/*"); + server.start(); + Connector[] connector = server.getConnectors(); + conf.set("dt.attr.GATEWAY_CONNECT_ADDRESS", "localhost:" + connector[0].getLocalPort()); + + MRMonitoringApplication application = new MRMonitoringApplication(); + LocalMode lma = LocalMode.newInstance(); + lma.prepareDAG(application, conf); + LocalMode.Controller lc = lma.getController(); + lc.run(10000); + server.stop(); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/test/resources/dt-site-monitoring.xml ---------------------------------------------------------------------- diff --git a/examples/mrmonitor/src/test/resources/dt-site-monitoring.xml b/examples/mrmonitor/src/test/resources/dt-site-monitoring.xml new file mode 100644 index 0000000..88bae44 --- /dev/null +++ b/examples/mrmonitor/src/test/resources/dt-site-monitoring.xml @@ -0,0 +1,63 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <property> + <name>dt.application.MRMonitoringExample.operator.JobMonitor.attr.INITIAL_PARTITION_COUNT</name> + <value>1</value> + </property> + <property> + <name>dt.application.MRMonitoringExample.operator.JobMonitor.attr.APPLICATION_WINDOW_COUNT</name> + <value>4</value> + </property> + <property> + <name>dt.application.MRMonitoringExample.operator.JobMonitor.prop.maxJobs</name> + <value>25</value> + </property> + <property> + <name>dt.application.MRMonitoringExample.operator.JobMonitor.prop.maxJobs</name> + <value>25</value> + </property> + <property> + <name>dt.application.MRMonitoringExample.operator.Query.prop.topic</name> + <value>contrib.summit.mrDebugger.mrDebuggerQuery</value> + </property> + <property> + <name>dt.application.MRMonitoringExample.operator.JobOutput.prop.topic</name> + <value>contrib.summit.mrDebugger.jobResult</value> + </property> + <property> + <name>dt.application.MRMonitoringExample.operator.MapJob.prop.topic</name> + <value>contrib.summit.mrDebugger.mapResult</value> + </property> + <property> + <name>dt.application.MRMonitoringExample.operator.ReduceJob.prop.topic</name> + <value>contrib.summit.mrDebugger.reduceResult</value> + </property> + <property> + <name>dt.application.MRMonitoringExample.operator.JobCounter.prop.topic</name> + <value>contrib.summit.mrDebugger.counterResult</value> + </property> + <property> + <name>dt.application.MRMonitoringExample.stream.QueryConversion.locality</name> + <value>CONTAINER_LOCAL</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mrmonitor/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/mrmonitor/src/test/resources/log4j.properties b/examples/mrmonitor/src/test/resources/log4j.properties new file mode 100644 index 0000000..cf0d19e --- /dev/null +++ b/examples/mrmonitor/src/test/resources/log4j.properties @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=${test.log.console.threshold} +test.log.console.threshold=DEBUG + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug +log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/pom.xml ---------------------------------------------------------------------- diff --git a/examples/mroperator/pom.xml b/examples/mroperator/pom.xml new file mode 100644 index 0000000..c9a7b65 --- /dev/null +++ b/examples/mroperator/pom.xml @@ -0,0 +1,56 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>malhar-examples-mroperator</artifactId> + <packaging>jar</packaging> + + <name>Apache Apex Malhar MR Operator Example</name> + <description></description> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <properties> + <skipTests>true</skipTests> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/mroperator/src/assemble/appPackage.xml b/examples/mroperator/src/assemble/appPackage.xml new file mode 100644 index 0000000..4138cf2 --- /dev/null +++ b/examples/mroperator/src/assemble/appPackage.xml @@ -0,0 +1,59 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/DateWritable.java ---------------------------------------------------------------------- diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/DateWritable.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/DateWritable.java new file mode 100644 index 0000000..4cf1f5a --- /dev/null +++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/DateWritable.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mroperator; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; + +import org.apache.hadoop.io.WritableComparable; + +/** + * <p>DateWritable class.</p> + * + * @since 0.9.0 + */ +public class DateWritable implements WritableComparable<DateWritable> +{ + private static final SimpleDateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd' T 'HH:mm:ss.SSS" ); + private Date date; + + public Date getDate() + { + return date; + } + + public void setDate( Date date ) + { + this.date = date; + } + + public void readFields( DataInput in ) throws IOException + { + date = new Date( in.readLong() ); + } + + public void write( DataOutput out ) throws IOException + { + out.writeLong( date.getTime() ); + } + + @Override + public boolean equals(Object o) + { + return toString().equals(o.toString()); + } + + @Override + public int hashCode() + { + return toString().hashCode(); + } + + public String toString() + { + return formatter.format( date); + } + + public int compareTo( DateWritable other ) + { + return date.compareTo( other.getDate() ); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/HdfsKeyValOutputOperator.java ---------------------------------------------------------------------- diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/HdfsKeyValOutputOperator.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/HdfsKeyValOutputOperator.java new file mode 100644 index 0000000..94e17c1 --- /dev/null +++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/HdfsKeyValOutputOperator.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mroperator; + +import com.datatorrent.lib.io.fs.AbstractSingleFileOutputOperator; +import com.datatorrent.lib.util.KeyHashValPair; + +/** + * Adapter for writing KeyHashValPair objects to HDFS + * <p> + * Serializes tuples into a HDFS file.<br/> + * </p> + * + * @param <K> Key type + * @param <V> Value type + * @since 0.9.4 + */ +public class HdfsKeyValOutputOperator<K, V> extends AbstractSingleFileOutputOperator<KeyHashValPair<K, V>> +{ + @Override + public byte[] getBytesForTuple(KeyHashValPair<K,V> t) + { + return (t.toString() + "\n").getBytes(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/InvertedIndexApplication.java ---------------------------------------------------------------------- diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/InvertedIndexApplication.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/InvertedIndexApplication.java new file mode 100644 index 0000000..af97bc4 --- /dev/null +++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/InvertedIndexApplication.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mroperator; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.TextInputFormat; + +import com.datatorrent.api.annotation.ApplicationAnnotation; + +/** + * <p>InvertedIndexApplication class.</p> + * + * @since 0.9.0 + */ +@ApplicationAnnotation(name = "InvertedIndexExample") +public class InvertedIndexApplication extends MapReduceApplication<LongWritable, Text, Text, Text> +{ + + InvertedIndexApplication() + { + setMapClass(LineIndexer.LineIndexMapper.class); + setReduceClass(LineIndexer.LineIndexReducer.class); + + setInputFormat(TextInputFormat.class); + + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LineIndexer.java ---------------------------------------------------------------------- diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LineIndexer.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LineIndexer.java new file mode 100644 index 0000000..a2c589d --- /dev/null +++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LineIndexer.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mroperator; + +import java.io.IOException; +import java.util.Iterator; +import java.util.StringTokenizer; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; + +/** + * <p>LineIndexer class.</p> + * + * @since 0.9.0 + */ +public class LineIndexer +{ + + public static class LineIndexMapper extends MapReduceBase + implements Mapper<LongWritable, Text, Text, Text> + { + private static final Text word = new Text(); + private static final Text location = new Text(); + + public void map(LongWritable key, Text val, + OutputCollector<Text, Text> output, Reporter reporter) throws IOException + { + FileSplit fileSplit = (FileSplit)reporter.getInputSplit(); + String fileName = fileSplit.getPath().getName(); + location.set(fileName); + + String line = val.toString(); + StringTokenizer itr = new StringTokenizer(line.toLowerCase()); + while (itr.hasMoreTokens()) { + word.set(itr.nextToken()); + output.collect(word, location); + } + } + } + + + + public static class LineIndexReducer extends MapReduceBase + implements Reducer<Text, Text, Text, Text> + { + public void reduce(Text key, Iterator<Text> values, + OutputCollector<Text, Text> output, Reporter reporter) throws IOException + { + boolean first = true; + StringBuilder toReturn = new StringBuilder(); + while (values.hasNext()) { + if (!first) { + toReturn.append(", "); + } + first = false; + toReturn.append(values.next().toString()); + } + + output.collect(key, new Text(toReturn.toString())); + } + } + + + /** + * The actual main() method for our program; this is the + * "driver" for the MapReduce job. + */ + public static void main(String[] args) + { + JobClient client = new JobClient(); + JobConf conf = new JobConf(LineIndexer.class); + + conf.setJobName("LineIndexer"); + + conf.setOutputKeyClass(Text.class); + conf.setOutputValueClass(Text.class); + + FileInputFormat.addInputPath(conf, new Path("input")); + FileOutputFormat.setOutputPath(conf, new Path("output")); + + conf.setMapperClass(LineIndexMapper.class); + conf.setReducerClass(LineIndexReducer.class); + + client.setConf(conf); + + try { + JobClient.runJob(conf); + } catch (Exception e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogCountsPerHour.java ---------------------------------------------------------------------- diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogCountsPerHour.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogCountsPerHour.java new file mode 100644 index 0000000..6255810 --- /dev/null +++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogCountsPerHour.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mroperator; + +import java.io.IOException; +import java.util.Calendar; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * <p>LogCountsPerHour class.</p> + * + * @since 0.9.0 + */ +public class LogCountsPerHour extends Configured implements Tool +{ + + public static class LogMapClass extends MapReduceBase + implements Mapper<LongWritable, Text, DateWritable, IntWritable> + { + private DateWritable date = new DateWritable(); + private static final IntWritable one = new IntWritable(1); + + public void map(LongWritable key, Text value, OutputCollector<DateWritable, IntWritable> output, Reporter reporter) throws IOException + { + // Get the value as a String; it is of the format: + // 111.111.111.111 - - [16/Dec/2012:05:32:50 -0500] "GET / HTTP/1.1" 200 14791 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)" + String text = value.toString(); + + // Get the date and time + int openBracket = text.indexOf('['); + int closeBracket = text.indexOf(']'); + if (openBracket != -1 && closeBracket != -1) { + // Read the date + String dateString = text.substring(text.indexOf('[') + 1, text.indexOf(']')); + + // Build a date object from a string of the form: 16/Dec/2012:05:32:50 -0500 + int index = 0; + int nextIndex = dateString.indexOf('/'); + int day = Integer.parseInt(dateString.substring(index, nextIndex)); + + index = nextIndex; + nextIndex = dateString.indexOf('/', index + 1); + String month = dateString.substring(index + 1, nextIndex); + + index = nextIndex; + nextIndex = dateString.indexOf(':', index); + int year = Integer.parseInt(dateString.substring(index + 1, nextIndex)); + + index = nextIndex; + nextIndex = dateString.indexOf(':', index + 1); + int hour = Integer.parseInt(dateString.substring(index + 1, nextIndex)); + + // Build a calendar object for this date + Calendar calendar = Calendar.getInstance(); + calendar.set(Calendar.DATE, day); + calendar.set(Calendar.YEAR, year); + calendar.set(Calendar.HOUR, hour); + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MILLISECOND, 0); + + if (month.equalsIgnoreCase("dec")) { + calendar.set(Calendar.MONTH, Calendar.DECEMBER); + } else if (month.equalsIgnoreCase("nov")) { + calendar.set(Calendar.MONTH, Calendar.NOVEMBER); + } else if (month.equalsIgnoreCase("oct")) { + calendar.set(Calendar.MONTH, Calendar.OCTOBER); + } else if (month.equalsIgnoreCase("sep")) { + calendar.set(Calendar.MONTH, Calendar.SEPTEMBER); + } else if (month.equalsIgnoreCase("aug")) { + calendar.set(Calendar.MONTH, Calendar.AUGUST); + } else if (month.equalsIgnoreCase("jul")) { + calendar.set(Calendar.MONTH, Calendar.JULY); + } else if (month.equalsIgnoreCase("jun")) { + calendar.set(Calendar.MONTH, Calendar.JUNE); + } else if (month.equalsIgnoreCase("may")) { + calendar.set(Calendar.MONTH, Calendar.MAY); + } else if (month.equalsIgnoreCase("apr")) { + calendar.set(Calendar.MONTH, Calendar.APRIL); + } else if (month.equalsIgnoreCase("mar")) { + calendar.set(Calendar.MONTH, Calendar.MARCH); + } else if (month.equalsIgnoreCase("feb")) { + calendar.set(Calendar.MONTH, Calendar.FEBRUARY); + } else if (month.equalsIgnoreCase("jan")) { + calendar.set(Calendar.MONTH, Calendar.JANUARY); + } + + + // Output the date as the key and 1 as the value + date.setDate(calendar.getTime()); + output.collect(date, one); + } + } + } + + public static class LogReduce extends MapReduceBase + implements Reducer<DateWritable, IntWritable, DateWritable, IntWritable> + { + public void reduce(DateWritable key, Iterator<IntWritable> values, OutputCollector<DateWritable, IntWritable> output, Reporter reporter) throws IOException + { + // Iterate over all of the values (counts of occurrences of this word) + int count = 0; + while (values.hasNext()) { + // Add the value to our count + count += values.next().get(); + } + + // Output the word with its count (wrapped in an IntWritable) + output.collect(key, new IntWritable(count)); + } + } + + + public int run(String[] args) throws Exception + { + // Create a configuration + Configuration conf = getConf(); + + // Create a job from the default configuration that will use the WordCount class + JobConf job = new JobConf(conf, LogCountsPerHour.class); + + // Define our input path as the first command line argument and our output path as the second + Path in = new Path(args[0]); + Path out = new Path(args[1]); + + // Create File Input/Output formats for these paths (in the job) + FileInputFormat.setInputPaths(job, in); + FileOutputFormat.setOutputPath(job, out); + + // Configure the job: name, mapper, reducer, and combiner + job.setJobName("LogAveragePerHour"); + job.setMapperClass(LogMapClass.class); + job.setReducerClass(LogReduce.class); + job.setCombinerClass(LogReduce.class); + + // Configure the output + job.setOutputFormat(TextOutputFormat.class); + job.setOutputKeyClass(DateWritable.class); + job.setOutputValueClass(IntWritable.class); + + // Run the job + JobClient.runJob(job); + return 0; + } + + public static void main(String[] args) throws Exception + { + // Start the LogCountsPerHour MapReduce application + int res = ToolRunner.run(new Configuration(), new LogCountsPerHour(), args); + System.exit(res); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogsCountApplication.java ---------------------------------------------------------------------- diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogsCountApplication.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogsCountApplication.java new file mode 100644 index 0000000..51b082d --- /dev/null +++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/LogsCountApplication.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mroperator; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.TextInputFormat; + +import com.datatorrent.api.annotation.ApplicationAnnotation; + +/** + * <p>LogsCountApplication class.</p> + * + * @since 0.9.0 + */ +@ApplicationAnnotation(name = "LogsCountExample") +public class LogsCountApplication extends MapReduceApplication<LongWritable, Text, DateWritable, IntWritable> +{ + + public void LogsCountApplication() + { + setMapClass(LogCountsPerHour.LogMapClass.class); + // setCombineClass(LogCountsPerHour.LogReduce.class); + setReduceClass(LogCountsPerHour.LogReduce.class); + setInputFormat(TextInputFormat.class); + + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapOperator.java ---------------------------------------------------------------------- diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapOperator.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapOperator.java new file mode 100644 index 0000000..ce00f54 --- /dev/null +++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapOperator.java @@ -0,0 +1,414 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mroperator; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import javax.validation.constraints.Min; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.examples.mroperator.ReporterImpl.ReporterType; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.SerializationFactory; +import org.apache.hadoop.io.serializer.Serializer; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.KeyValueTextInputFormat; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextInputFormat; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Partitioner; + +import com.datatorrent.lib.util.KeyHashValPair; + +/** + * <p> + * MapOperator class. + * </p> + * + * @since 0.9.0 + */ +@SuppressWarnings({ "unchecked"}) +public class MapOperator<K1, V1, K2, V2> implements InputOperator, Partitioner<MapOperator<K1, V1, K2, V2>> +{ + + private static final Logger logger = LoggerFactory.getLogger(MapOperator.class); + private String dirName; + private boolean emitPartitioningCountOnce = false; + private boolean emitLastCountOnce = false; + private int operatorId; + private Class<? extends InputFormat<K1, V1>> inputFormatClass; + private transient InputFormat<K1, V1> inputFormat; + private transient InputSplit inputSplit; + private Class<? extends InputSplit> inputSplitClass; + private ByteArrayOutputStream outstream = new ByteArrayOutputStream(); + private transient RecordReader<K1, V1> reader; + private boolean emittedAll = false; + public final transient DefaultOutputPort<KeyHashValPair<Integer, Integer>> outputCount = new DefaultOutputPort<KeyHashValPair<Integer, Integer>>(); + public final transient DefaultOutputPort<KeyHashValPair<K2, V2>> output = new DefaultOutputPort<KeyHashValPair<K2, V2>>(); + private transient JobConf jobConf; + @Min(1) + private int partitionCount = 1; + + public Class<? extends InputSplit> getInputSplitClass() + { + return inputSplitClass; + } + + public void setInputSplitClass(Class<? extends InputSplit> inputSplitClass) + { + this.inputSplitClass = inputSplitClass; + } + + public Class<? extends InputFormat<K1, V1>> getInputFormatClass() + { + return inputFormatClass; + } + + public void setInputFormatClass(Class<? extends InputFormat<K1, V1>> inputFormatClass) + { + this.inputFormatClass = inputFormatClass; + } + + public String getDirName() + { + return dirName; + } + + public void setDirName(String dirName) + { + this.dirName = dirName; + } + + public int getPartitionCount() + { + return partitionCount; + } + + public void setPartitionCount(int partitionCount) + { + this.partitionCount = partitionCount; + } + + @Override + public void beginWindow(long windowId) + { + if (!emitPartitioningCountOnce) { + outputCount.emit(new KeyHashValPair<Integer, Integer>(operatorId, 1)); + emitPartitioningCountOnce = true; + } + if (reader == null) { + try { + reader = inputFormat.getRecordReader(inputSplit, new JobConf(new Configuration()), reporter); + } catch (IOException e) { + logger.info("error getting record reader {}", e.getMessage()); + } + } + } + + @Override + public void teardown() + { + + } + + @Override + public void setup(OperatorContext context) + { + if (context != null) { + operatorId = context.getId(); + } + reporter = new ReporterImpl(ReporterType.Mapper, new Counters()); + outputCollector = new OutputCollectorImpl<K2, V2>(); + Configuration conf = new Configuration(); + try { + inputFormat = inputFormatClass.newInstance(); + SerializationFactory serializationFactory = new SerializationFactory(conf); + Deserializer keyDesiralizer = serializationFactory.getDeserializer(inputSplitClass); + keyDesiralizer.open(new ByteArrayInputStream(outstream.toByteArray())); + inputSplit = (InputSplit)keyDesiralizer.deserialize(null); + ((ReporterImpl)reporter).setInputSplit(inputSplit); + reader = inputFormat.getRecordReader(inputSplit, new JobConf(conf), reporter); + } catch (Exception e) { + logger.info("failed to initialize inputformat obj {}", inputFormat); + throw new RuntimeException(e); + } + InputStream stream = null; + if (configFile != null && configFile.length() > 0) { + stream = ClassLoader.getSystemResourceAsStream("/" + configFile); + if (stream == null) { + stream = ClassLoader.getSystemResourceAsStream(configFile); + } + } + if (stream != null) { + conf.addResource(stream); + } + jobConf = new JobConf(conf); + if (mapClass != null) { + try { + mapObject = mapClass.newInstance(); + } catch (Exception e) { + logger.info("can't instantiate object {}", e.getMessage()); + } + + mapObject.configure(jobConf); + } + if (combineClass != null) { + try { + combineObject = combineClass.newInstance(); + } catch (Exception e) { + logger.info("can't instantiate object {}", e.getMessage()); + } + combineObject.configure(jobConf); + } + } + + @Override + public void emitTuples() + { + if (!emittedAll) { + try { + K1 key = reader.createKey(); + V1 val = reader.createValue(); + emittedAll = !reader.next(key, val); + if (!emittedAll) { + KeyHashValPair<K1, V1> keyValue = new KeyHashValPair<K1, V1>(key, val); + mapObject.map(keyValue.getKey(), keyValue.getValue(), outputCollector, reporter); + if (combineObject == null) { + List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList(); + for (KeyHashValPair<K2, V2> e : list) { + output.emit(e); + } + list.clear(); + } + } + } catch (IOException ex) { + logger.debug(ex.toString()); + throw new RuntimeException(ex); + } + } + } + + @Override + public void endWindow() + { + List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList(); + if (combineObject != null) { + Map<K2, List<V2>> cacheObject = new HashMap<K2, List<V2>>(); + for (KeyHashValPair<K2, V2> tuple : list) { + List<V2> cacheList = cacheObject.get(tuple.getKey()); + if (cacheList == null) { + cacheList = new ArrayList<V2>(); + cacheList.add(tuple.getValue()); + cacheObject.put(tuple.getKey(), cacheList); + } else { + cacheList.add(tuple.getValue()); + } + } + list.clear(); + OutputCollector<K2, V2> tempOutputCollector = new OutputCollectorImpl<K2, V2>(); + for (Map.Entry<K2, List<V2>> e : cacheObject.entrySet()) { + try { + combineObject.reduce(e.getKey(), e.getValue().iterator(), tempOutputCollector, reporter); + } catch (IOException e1) { + logger.info(e1.getMessage()); + } + } + list = ((OutputCollectorImpl<K2, V2>)tempOutputCollector).getList(); + for (KeyHashValPair<K2, V2> e : list) { + output.emit(e); + } + } + if (!emitLastCountOnce && emittedAll) { + outputCount.emit(new KeyHashValPair<Integer, Integer>(operatorId, -1)); + logger.info("emitting end of file {}", new KeyHashValPair<Integer, Integer>(operatorId, -1)); + emitLastCountOnce = true; + } + list.clear(); + } + + private InputSplit[] getSplits(JobConf conf, int numSplits, String path) throws Exception + { + FileInputFormat.setInputPaths(conf, new Path(path)); + if (inputFormat == null) { + inputFormat = inputFormatClass.newInstance(); + String inputFormatClassName = inputFormatClass.getName(); + if (inputFormatClassName.equals("org.apache.hadoop.mapred.TextInputFormat")) { + ((TextInputFormat)inputFormat).configure(conf); + } else if (inputFormatClassName.equals("org.apache.hadoop.mapred.KeyValueTextInputFormat")) { + ((KeyValueTextInputFormat)inputFormat).configure(conf); + } + } + return inputFormat.getSplits(conf, numSplits); + // return null; + } + + @Override + public void partitioned(Map<Integer, Partition<MapOperator<K1, V1, K2, V2>>> partitions) + { + } + + @SuppressWarnings("rawtypes") + @Override + public Collection<Partition<MapOperator<K1, V1, K2, V2>>> definePartitions(Collection<Partition<MapOperator<K1, V1, K2, V2>>> partitions, PartitioningContext context) + { + int tempPartitionCount = partitionCount; + + Collection c = partitions; + Collection<Partition<MapOperator<K1, V1, K2, V2>>> operatorPartitions = c; + Partition<MapOperator<K1, V1, K2, V2>> template; + Iterator<Partition<MapOperator<K1, V1, K2, V2>>> itr = operatorPartitions.iterator(); + template = itr.next(); + Configuration conf = new Configuration(); + SerializationFactory serializationFactory = new SerializationFactory(conf); + if (outstream.size() == 0) { + InputSplit[] splits; + try { + splits = getSplits(new JobConf(conf), tempPartitionCount, template.getPartitionedInstance().getDirName()); + } catch (Exception e1) { + logger.info(" can't get splits {}", e1.getMessage()); + throw new RuntimeException(e1); + } + Collection<Partition<MapOperator<K1, V1, K2, V2>>> operList = new ArrayList<Partition<MapOperator<K1, V1, K2, V2>>>(); + itr = operatorPartitions.iterator(); + int size = splits.length; + Serializer keySerializer = serializationFactory.getSerializer(splits[0].getClass()); + while (size > 0 && itr.hasNext()) { + Partition<MapOperator<K1, V1, K2, V2>> p = itr.next(); + MapOperator<K1, V1, K2, V2> opr = p.getPartitionedInstance(); + opr.setInputFormatClass(inputFormatClass); + opr.setMapClass(mapClass); + opr.setCombineClass(combineClass); + opr.setConfigFile(configFile); + try { + keySerializer.open(opr.getOutstream()); + keySerializer.serialize(splits[size - 1]); + opr.setInputSplitClass(splits[size - 1].getClass()); + } catch (IOException e) { + logger.info("error while serializing {}", e.getMessage()); + } + size--; + operList.add(p); + } + while (size > 0) { + MapOperator<K1, V1, K2, V2> opr = new MapOperator<K1, V1, K2, V2>(); + opr.setInputFormatClass(inputFormatClass); + opr.setMapClass(mapClass); + opr.setCombineClass(combineClass); + opr.setConfigFile(configFile); + try { + keySerializer.open(opr.getOutstream()); + keySerializer.serialize(splits[size - 1]); + opr.setInputSplitClass(splits[size - 1].getClass()); + } catch (IOException e) { + logger.info("error while serializing {}", e.getMessage()); + } + size--; + operList.add(new DefaultPartition<MapOperator<K1, V1, K2, V2>>(opr)); + } + try { + keySerializer.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + return operList; + } + return null; + } + + public ByteArrayOutputStream getOutstream() + { + return outstream; + } + + public void setOutstream(ByteArrayOutputStream outstream) + { + this.outstream = outstream; + } + + /** + * adding map code + */ + + private Class<? extends Mapper<K1, V1, K2, V2>> mapClass; + private Class<? extends Reducer<K2, V2, K2, V2>> combineClass; + + private transient Mapper<K1, V1, K2, V2> mapObject; + private transient Reducer<K2, V2, K2, V2> combineObject; + private transient Reporter reporter; + + private String configFile; + + public String getConfigFile() + { + return configFile; + } + + public void setConfigFile(String configFile) + { + this.configFile = configFile; + } + + private transient OutputCollector<K2, V2> outputCollector; + + public Class<? extends Mapper<K1, V1, K2, V2>> getMapClass() + { + return mapClass; + } + + public void setMapClass(Class<? extends Mapper<K1, V1, K2, V2>> mapClass) + { + this.mapClass = mapClass; + } + + public Class<? extends Reducer<K2, V2, K2, V2>> getCombineClass() + { + return combineClass; + } + + public void setCombineClass(Class<? extends Reducer<K2, V2, K2, V2>> combineClass) + { + this.combineClass = combineClass; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapReduceApplication.java ---------------------------------------------------------------------- diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapReduceApplication.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapReduceApplication.java new file mode 100644 index 0000000..98f4dc7 --- /dev/null +++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/MapReduceApplication.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mroperator; + +import java.util.StringTokenizer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.Reducer; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + + +/** + * <p> + * Abstract MapReduceApplication class. + * </p> + * + * @since 0.9.0 + */ +@ApplicationAnnotation(name = "MapReduceExample") +public abstract class MapReduceApplication<K1, V1, K2, V2> implements StreamingApplication +{ + Class<? extends InputFormat<K1, V1>> inputFormat; + Class<? extends Mapper<K1, V1, K2, V2>> mapClass; + Class<? extends Reducer<K2, V2, K2, V2>> reduceClass; + Class<? extends Reducer<K2, V2, K2, V2>> combineClass; + + public Class<? extends Reducer<K2, V2, K2, V2>> getCombineClass() + { + return combineClass; + } + + public void setCombineClass(Class<? extends Reducer<K2, V2, K2, V2>> combineClass) + { + this.combineClass = combineClass; + } + + public void setInputFormat(Class<? extends InputFormat<K1, V1>> inputFormat) + { + this.inputFormat = inputFormat; + } + + public Class<? extends Mapper<K1, V1, K2, V2>> getMapClass() + { + return mapClass; + } + + public void setMapClass(Class<? extends Mapper<K1, V1, K2, V2>> mapClass) + { + this.mapClass = mapClass; + } + + public Class<? extends Reducer<K2, V2, K2, V2>> getReduceClass() + { + return reduceClass; + } + + public void setReduceClass(Class<? extends Reducer<K2, V2, K2, V2>> reduceClass) + { + this.reduceClass = reduceClass; + } + + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + String configurationFilePath = conf.get(this.getClass().getSimpleName() + ".configFile", ""); + + MapOperator<K1, V1, K2, V2> inputOperator = dag.addOperator("Mapper", new MapOperator<K1, V1, K2, V2>()); + inputOperator.setInputFormatClass(inputFormat); + + String configFileName = null; + if (configurationFilePath != null && !configurationFilePath.isEmpty()) { + StringTokenizer configFileTokenizer = new StringTokenizer(configurationFilePath, "/"); + configFileName = configFileTokenizer.nextToken(); + while (configFileTokenizer.hasMoreTokens()) { + configFileName = configFileTokenizer.nextToken(); + } + } + + inputOperator.setMapClass(mapClass); + inputOperator.setConfigFile(configFileName); + inputOperator.setCombineClass(combineClass); + + ReduceOperator<K2, V2, K2, V2> reduceOpr = dag.addOperator("Reducer", new ReduceOperator<K2, V2, K2, V2>()); + reduceOpr.setReduceClass(reduceClass); + reduceOpr.setConfigFile(configFileName); + + HdfsKeyValOutputOperator<K2, V2> console = dag.addOperator("Console", new HdfsKeyValOutputOperator<K2, V2>()); + + dag.addStream("Mapped-Output", inputOperator.output, reduceOpr.input); + dag.addStream("Mapper-Count", inputOperator.outputCount, reduceOpr.inputCount); + dag.addStream("Reduced-Output", reduceOpr.output, console.input); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/NewWordCountApplication.java ---------------------------------------------------------------------- diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/NewWordCountApplication.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/NewWordCountApplication.java new file mode 100644 index 0000000..db32b4a --- /dev/null +++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/NewWordCountApplication.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mroperator; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.TextInputFormat; + +import com.datatorrent.api.annotation.ApplicationAnnotation; + +/** + * <p>NewWordCountApplication class.</p> + * + * @since 0.9.0 + */ +@ApplicationAnnotation(name = "WordCountExample") +public class NewWordCountApplication extends MapReduceApplication<LongWritable, Text, Text, IntWritable> +{ + + public void NewWordCountApplication() + { + setMapClass(WordCount.Map.class); + setReduceClass(WordCount.Reduce.class); + setCombineClass(WordCount.Reduce.class); + setInputFormat(TextInputFormat.class); + } +}
