http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java
----------------------------------------------------------------------
diff --git 
a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java
 
b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java
deleted file mode 100644
index 481f3dc..0000000
--- 
a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRStatusObject.java
+++ /dev/null
@@ -1,501 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mrmonitor;
-
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * <p>
- * MRStatusObject class.
- * </p>
- *
- * @since 0.3.4
- */
-public class MRStatusObject
-{
-  private String command;
-  /**
-   * This stores the Resource Manager/ Task Manager's host information
-   */
-  private String uri;
-  /**
-   * This field stores the job id
-   */
-  private String jobId;
-  /**
-   * This field stores the api version of the rest apis
-   */
-  private String apiVersion;
-  /**
-   * This field stores the hadoop version 1 for 1.x and 2 for 2.x
-   */
-  private int hadoopVersion;
-  /**
-   * This field stores the app id for the hadoop 2.x
-   */
-  private String appId;
-  /**
-   * This field stores the RM port information for hadoop 2.x / Task Manager 
server port for hadoop 1.X from where we
-   * can get the job information
-   */
-  private int rmPort;
-  /**
-   * This field stores the history server information for hadoop 2.x from 
where we can get the job information
-   */
-  private int historyServerPort;
-  /**
-   * This field stores the job information as json object
-   */
-  private JSONObject jsonObject;
-  /**
-   * This field tells if the object has been modified
-   */
-  private boolean modified;
-  /**
-   * This stores the mapping of map task ids to the TaskObject
-   */
-  private Map<String, TaskObject> mapJsonObject;
-  /**
-   * This stores the mapping of reduce task ids to the TaskObject
-   */
-  private Map<String, TaskObject> reduceJsonObject;
-  /**
-   * This holds the information about the various metrics like 
MAP_OUTPUT_RECORDS etc for this job
-   */
-  private TaskObject metricObject;
-
-  /**
-   * This holds the number of windows occurred when the new data was retrieved 
for this job
-   */
-  private int retrials;
-
-  /**
-   * The scheduler is used to store the job status every 1 minute
-   */
-  private transient ScheduledExecutorService statusScheduler;
-
-  /**
-   * This stores the progress of the map tasks
-   */
-  Queue<String> mapStatusHistory;
-
-  /**
-   * This stores the progress of the reduce tasks
-   */
-  Queue<String> reduceStatusHistory;
-
-  /**
-   * This stores the history of the physical memory usage
-   */
-  Queue<String> physicalMemoryStatusHistory;
-
-  /**
-   * This stores the history of the virtual memory usage
-   */
-  Queue<String> virtualMemoryStatusHistory;
-
-  /**
-   * This stores the history of the cpu
-   */
-  Queue<String> cpuStatusHistory;
-
-  /**
-   * The number of minutes for which the status history of map and reduce 
tasks is stored
-   */
-  private int statusHistoryCount = 60;
-
-  /**
-   * This field notifies if history status queues have changed over time
-   */
-  private boolean changedHistoryStatus;
-
-  public MRStatusObject()
-  {
-    retrials = 0;
-    modified = true;
-    mapJsonObject = new ConcurrentHashMap<String, TaskObject>();
-    reduceJsonObject = new ConcurrentHashMap<String, TaskObject>();
-    mapStatusHistory = new LinkedList<String>();
-    reduceStatusHistory = new LinkedList<String>();
-    physicalMemoryStatusHistory = new LinkedList<String>();
-    virtualMemoryStatusHistory = new LinkedList<String>();
-    cpuStatusHistory = new LinkedList<String>();
-    statusScheduler = Executors.newScheduledThreadPool(1);
-    statusScheduler.scheduleAtFixedRate(new Runnable()
-    {
-      @Override
-      public void run()
-      {
-        if (jsonObject != null) {
-          changedHistoryStatus = true;
-          if (mapStatusHistory.size() > statusHistoryCount) {
-            mapStatusHistory.poll();
-            reduceStatusHistory.poll();
-            physicalMemoryStatusHistory.poll();
-            virtualMemoryStatusHistory.poll();
-            cpuStatusHistory.poll();
-          }
-          if (hadoopVersion == 2) {
-            try {
-              
mapStatusHistory.add(jsonObject.getJSONObject("job").getString("mapProgress"));
-              
reduceStatusHistory.add(jsonObject.getJSONObject("job").getString("reduceProgress"));
-              if (metricObject.getJson() != null) {
-                JSONArray arr = 
metricObject.getJson().getJSONObject("jobCounters").getJSONArray("counterGroup");
-                int length = arr.length();
-                for (int i = 0; i < length; i++) {
-                  if 
(arr.getJSONObject(i).get("counterGroupName").equals("org.apache.hadoop.mapreduce.TaskCounter"))
 {
-                    JSONArray counters = 
arr.getJSONObject(i).getJSONArray("counter");
-                    for (int j = 0; j < counters.length(); j++) {
-                      JSONObject counterObj = counters.getJSONObject(j);
-                      if 
(counterObj.get("name").equals("PHYSICAL_MEMORY_BYTES")) {
-                        
physicalMemoryStatusHistory.add(counterObj.getString("totalCounterValue"));
-                      } else if 
(counterObj.get("name").equals("VIRTUAL_MEMORY_BYTES")) {
-                        
virtualMemoryStatusHistory.add(counterObj.getString("totalCounterValue"));
-                      } else if 
(counterObj.get("name").equals("CPU_MILLISECONDS")) {
-                        
cpuStatusHistory.add(counterObj.getString("totalCounterValue"));
-                      }
-                    }
-                    break;
-                  }
-                }
-              }
-            } catch (JSONException e) {
-              logger.error("error setting status history {}", e.getMessage());
-            }
-          } else {
-            try {
-              
mapStatusHistory.add(jsonObject.getJSONObject("mapTaskSummary").getString("progressPercentage"));
-              
reduceStatusHistory.add(jsonObject.getJSONObject("reduceTaskSummary").getString("progressPercentage"));
-              JSONArray arr = jsonObject.getJSONArray("jobCounterGroupsInfo");
-              int length = arr.length();
-              for (int i = 0; i < length; i++) {
-                if (arr.getJSONObject(i).get("groupName").equals("Map-Reduce 
Framework")) {
-                  JSONArray counters = 
arr.getJSONObject(i).getJSONArray("jobCountersInfo");
-                  for (int j = 0; j < counters.length(); j++) {
-                    JSONObject counterObj = counters.getJSONObject(j);
-                    if (counterObj.get("name").equals("Physical memory (bytes) 
snapshot")) {
-                      
physicalMemoryStatusHistory.add(counterObj.getString("totalValue"));
-                    } else if (counterObj.get("name").equals("Virtual memory 
(bytes) snapshot")) {
-                      
virtualMemoryStatusHistory.add(counterObj.getString("totalValue"));
-                    } else if (counterObj.get("name").equals("CPU time spent 
(ms)")) {
-                      cpuStatusHistory.add(counterObj.getString("totalValue"));
-                    }
-                  }
-                  break;
-                }
-              }
-            } catch (JSONException e) {
-              logger.error("error setting status history {}", e.getMessage());
-            }
-          }
-        }
-      }
-    }, 0, 1, TimeUnit.MINUTES);
-  }
-
-  public Map<String, TaskObject> getMapJsonObject()
-  {
-    return mapJsonObject;
-  }
-
-  public void setMapJsonObject(Map<String, TaskObject> mapJsonObject)
-  {
-    this.mapJsonObject = mapJsonObject;
-  }
-
-  public Map<String, TaskObject> getReduceJsonObject()
-  {
-    return reduceJsonObject;
-  }
-
-  public void setReduceJsonObject(Map<String, TaskObject> reduceJsonObject)
-  {
-    this.reduceJsonObject = reduceJsonObject;
-  }
-
-  public String getUri()
-  {
-    return uri;
-  }
-
-  public void setUri(String uri)
-  {
-    this.uri = uri;
-  }
-
-  public String getJobId()
-  {
-    return jobId;
-  }
-
-  public void setJobId(String jobId)
-  {
-    this.jobId = jobId;
-  }
-
-  public String getApiVersion()
-  {
-    return apiVersion;
-  }
-
-  public void setApiVersion(String apiVersion)
-  {
-    this.apiVersion = apiVersion;
-  }
-
-  public int getHadoopVersion()
-  {
-    return hadoopVersion;
-  }
-
-  public void setHadoopVersion(int hadoopVersion)
-  {
-    this.hadoopVersion = hadoopVersion;
-  }
-
-  public String getAppId()
-  {
-    return appId;
-  }
-
-  public void setAppId(String appId)
-  {
-    this.appId = appId;
-  }
-
-  public int getRmPort()
-  {
-    return rmPort;
-  }
-
-  public void setRmPort(int rmPort)
-  {
-    this.rmPort = rmPort;
-  }
-
-  public int getHistoryServerPort()
-  {
-    return historyServerPort;
-  }
-
-  public void setHistoryServerPort(int historyServerPort)
-  {
-    this.historyServerPort = historyServerPort;
-  }
-
-  public JSONObject getJsonObject()
-  {
-    return jsonObject;
-  }
-
-  public void setJsonObject(JSONObject jsonObject)
-  {
-    this.jsonObject = jsonObject;
-  }
-
-  public boolean isChangedHistoryStatus()
-  {
-    return changedHistoryStatus;
-  }
-
-  public void setChangedHistoryStatus(boolean changedHistoryStatus)
-  {
-    this.changedHistoryStatus = changedHistoryStatus;
-  }
-
-  @Override
-  public boolean equals(Object that)
-  {
-    if (this == that) {
-      return true;
-    }
-    if (!(that instanceof MRStatusObject)) {
-      return false;
-    }
-    if (this.hashCode() == that.hashCode()) {
-      return true;
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode()
-  {
-    return (uri + jobId + apiVersion + 
String.valueOf(hadoopVersion)).hashCode();
-
-  }
-
-  public String getCommand()
-  {
-    return command;
-  }
-
-  public void setCommand(String command)
-  {
-    this.command = command;
-  }
-
-  public boolean isModified()
-  {
-    return modified;
-  }
-
-  public void setModified(boolean modified)
-  {
-    this.modified = modified;
-  }
-
-  public int getRetrials()
-  {
-    return retrials;
-  }
-
-  public void setRetrials(int retrials)
-  {
-    this.retrials = retrials;
-  }
-
-  public TaskObject getMetricObject()
-  {
-    return metricObject;
-  }
-
-  public void setMetricObject(TaskObject metricObject)
-  {
-    this.metricObject = metricObject;
-  }
-
-  public int getStatusHistoryCount()
-  {
-    return statusHistoryCount;
-  }
-
-  public void setStatusHistoryCount(int statusHistoryCount)
-  {
-    this.statusHistoryCount = statusHistoryCount;
-  }
-
-  public Queue<String> getMapStatusHistory()
-  {
-    return mapStatusHistory;
-  }
-
-  public Queue<String> getReduceStatusHistory()
-  {
-    return reduceStatusHistory;
-  }
-
-  public Queue<String> getPhysicalMemeoryStatusHistory()
-  {
-    return physicalMemoryStatusHistory;
-  }
-
-  public Queue<String> getVirtualMemoryStatusHistory()
-  {
-    return virtualMemoryStatusHistory;
-  }
-
-  public Queue<String> getCpuStatusHistory()
-  {
-    return cpuStatusHistory;
-  }
-
-  public static class TaskObject
-  {
-    /**
-     * This field stores the task information as json
-     */
-    private JSONObject json;
-    /**
-     * This field tells if the object was modified
-     */
-    private boolean modified;
-
-    public TaskObject(JSONObject json)
-    {
-      modified = true;
-      this.json = json;
-    }
-
-    /**
-     * This returns the task information as json
-     *
-     * @return
-     */
-    public JSONObject getJson()
-    {
-      return json;
-    }
-
-    /**
-     * This stores the task information as json
-     *
-     * @param json
-     */
-    public void setJson(JSONObject json)
-    {
-      this.json = json;
-    }
-
-    /**
-     * This returns if the json object has been modified
-     *
-     * @return
-     */
-    public boolean isModified()
-    {
-      return modified;
-    }
-
-    /**
-     * This sets if the json object is modified
-     *
-     * @param modified
-     */
-    public void setModified(boolean modified)
-    {
-      this.modified = modified;
-    }
-
-    /**
-     * This returns the string format of the json object
-     *
-     * @return
-     */
-    public String getJsonString()
-    {
-      return json.toString();
-    }
-  }
-
-  private static Logger logger = LoggerFactory.getLogger(MRStatusObject.class);
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java
----------------------------------------------------------------------
diff --git 
a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java 
b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java
deleted file mode 100644
index 0d7f6af..0000000
--- a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MRUtil.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mrmonitor;
-
-import java.io.IOException;
-
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.http.client.ClientProtocolException;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.ResponseHandler;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.BasicResponseHandler;
-import org.apache.http.impl.client.DefaultHttpClient;
-
-/**
- * <p>
- * Util class.
- * </p>
- *
- * @since 0.3.4
- */
-public class MRUtil
-{
-
-  private static final Logger logger = LoggerFactory.getLogger(MRUtil.class);
-
-  /**
-   * This method returns the response content for a given url
-   * @param url
-   * @return
-   */
-  public static String getJsonForURL(String url)
-  {
-    HttpClient httpclient = new DefaultHttpClient();
-    logger.debug(url);
-    try {
-
-
-      HttpGet httpget = new HttpGet(url);
-
-      // Create a response handler
-      ResponseHandler<String> responseHandler = new BasicResponseHandler();
-      String responseBody;
-      try {
-        responseBody = httpclient.execute(httpget, responseHandler);
-
-      } catch (ClientProtocolException e) {
-        logger.debug(e.getMessage());
-        return null;
-
-      } catch (IOException e) {
-        logger.debug(e.getMessage());
-        return null;
-      } catch (Exception e) {
-        logger.debug(e.getMessage());
-        return null;
-      }
-      return responseBody.trim();
-    } finally {
-      httpclient.getConnectionManager().shutdown();
-    }
-  }
-
-  /**
-   * This method returns the JSONObject for a given string
-   * @param json
-   * @return
-   */
-  public static JSONObject getJsonObject(String json)
-  {
-    try {
-      JSONObject jsonObj = new JSONObject(json);
-      return jsonObj;
-    } catch (Exception e) {
-      logger.debug("{}", e.getMessage());
-      return null;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java
----------------------------------------------------------------------
diff --git 
a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java
 
b/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java
deleted file mode 100644
index 5075163..0000000
--- 
a/demos/mrmonitor/src/main/java/com/datatorrent/demos/mrmonitor/MapToMRObjectOperator.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mrmonitor;
-
-import java.util.Map;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator;
-
-/**
- * <p>MapToMRObjectOperator class.</p>
- *
- * @since 0.9.0
- */
-public class MapToMRObjectOperator implements Operator
-{
-
-  public final transient DefaultInputPort<Map<String, String>> input = new 
DefaultInputPort<Map<String, String>>()
-  {
-    @Override
-    public void process(Map<String, String> tuple)
-    {
-      MRStatusObject mrStatusObj = new MRStatusObject();
-
-      for (Map.Entry<String, String> e : tuple.entrySet()) {
-        if (e.getKey().equals(Constants.QUERY_KEY_COMMAND)) {
-          mrStatusObj.setCommand(e.getValue());
-        } else if (e.getKey().equals(Constants.QUERY_API_VERSION)) {
-          mrStatusObj.setApiVersion(e.getValue());
-        } else if (e.getKey().equals(Constants.QUERY_APP_ID)) {
-          mrStatusObj.setAppId(e.getValue());
-        } else if (e.getKey().equals(Constants.QUERY_HADOOP_VERSION)) {
-          mrStatusObj.setHadoopVersion(Integer.parseInt(e.getValue()));
-        } else if (e.getKey().equals(Constants.QUERY_HOST_NAME)) {
-          mrStatusObj.setUri(e.getValue());
-        } else if (e.getKey().equals(Constants.QUERY_HS_PORT)) {
-          mrStatusObj.setHistoryServerPort(Integer.parseInt(e.getValue()));
-        } else if (e.getKey().equals(Constants.QUERY_JOB_ID)) {
-          mrStatusObj.setJobId(e.getValue());
-        } else if (e.getKey().equals(Constants.QUERY_RM_PORT)) {
-          mrStatusObj.setRmPort(Integer.parseInt(e.getValue()));
-        }
-      }
-      output.emit(mrStatusObj);
-
-    }
-  };
-
-  public final transient DefaultOutputPort<MRStatusObject> output = new 
DefaultOutputPort<MRStatusObject>();
-
-  @Override
-  public void setup(OperatorContext context)
-  {
-  }
-
-  @Override
-  public void teardown()
-  {
-  }
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-  }
-
-  @Override
-  public void endWindow()
-  {
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/main/resources/META-INF/properties.xml 
b/demos/mrmonitor/src/main/resources/META-INF/properties.xml
deleted file mode 100644
index 66de05e..0000000
--- a/demos/mrmonitor/src/main/resources/META-INF/properties.xml
+++ /dev/null
@@ -1,63 +0,0 @@
-<?xml version="1.0"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<configuration>
-  <property>
-    
<name>dt.application.MRMonitoringDemo.operator.JobMonitor.attr.PARTITIONER</name>
-    <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value>
-  </property>
-  <property>
-    
<name>dt.application.MRMonitoringDemo.operator.JobMonitor.attr.APPLICATION_WINDOW_COUNT</name>
-    <value>4</value>
-  </property>
-  <property>
-    
<name>dt.application.MRMonitoringDemo.operator.JobMonitor.prop.maxJobs</name>
-    <value>25</value>
-  </property>
-  <property>
-    
<name>dt.application.MRMonitoringDemo.operator.JobMonitor.prop.maxJobs</name>
-    <value>25</value>
-  </property>
-  <property>
-    <name>dt.application.MRMonitoringDemo.operator.Query.prop.topic</name>
-    <value>contrib.summit.mrDebugger.mrDebuggerQuery</value>
-  </property>
-  <property>
-    <name>dt.application.MRMonitoringDemo.operator.JobOutput.prop.topic</name>
-    <value>contrib.summit.mrDebugger.jobResult</value>
-  </property>
-  <property>
-    <name>dt.application.MRMonitoringDemo.operator.MapJob.prop.topic</name>
-    <value>contrib.summit.mrDebugger.mapResult</value>
-  </property>
-  <property>
-    <name>dt.application.MRMonitoringDemo.operator.ReduceJob.prop.topic</name>
-    <value>contrib.summit.mrDebugger.reduceResult</value>
-  </property>
-  <property>
-    <name>dt.application.MRMonitoringDemo.operator.JobCounter.prop.topic</name>
-    <value>contrib.summit.mrDebugger.counterResult</value>
-  </property>
-  <property>
-    
<name>dt.application.MRMonitoringDemo.stream.QueryConversion.locality</name>
-    <value>CONTAINER_LOCAL</value>
-  </property>
-</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/main/resources/mrdebugger.html
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/main/resources/mrdebugger.html 
b/demos/mrmonitor/src/main/resources/mrdebugger.html
deleted file mode 100644
index d8e6497..0000000
--- a/demos/mrmonitor/src/main/resources/mrdebugger.html
+++ /dev/null
@@ -1,237 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<!doctype html>
-<html>
-<head>
-<title>Mobile Demo</title>
-
-<META HTTP-EQUIV="CACHE-CONTROL" CONTENT="NO-CACHE">
-<meta name="viewport" content="initial-scale=1.0, user-scalable=no">
-<meta charset="utf-8">
-<script 
src="http://ajax.googleapis.com/ajax/libs/jquery/1.7.1/jquery.min.js";></script>
-<script 
src="http://ajax.googleapis.com/ajax/libs/jqueryui/1.10.3/jquery-ui.min.js";></script>
-<!--script src="js/vendor/jquery/dist/jquery.js"></script-->
-<style>
-       body {
-        margin: 0;
-       }
-
-    .phone-input {
-        margin-left: 0.5em;
-        margin-right: 0.5em;
-    }
-</style>
-
-</head>
-
-
-<body>
-
-
-<script>
-
-
-
-var map;
-var markers = {};
-
-$(function() {
-
-    $("#query1AddButton").click(function() {
-    
-           var app_id = $("input#app_id").val();
-           var hostname = $("input#hostname").val();
-           var job_id = $("input#job_id").val();
-           var hadoop_version= $("input#hadoop_version").val();
-           var api_version = $("input#api_version").val();
-           var rm_port = $("input#rm_port").val();
-           var hs_port = $("input#hs_port").val();
-           
-           var jsonData = {
-               command : 'add',
-               hostname:hostname,
-               app_id:app_id,
-               job_id:job_id,
-               hadoop_version:hadoop_version,
-               api_version:api_version,
-               rm_port:rm_port,
-               hs_port:hs_port
-           };
-    
-           sendQuery(jsonData, function() {
-               $('#query1SubmitConfirm').html("<div id='message'></div>");
-               $('#message').html("<h2>Add   submitted to application!</h2>")
-               .append("<p>Result will appear on page shortly.</p>");
-            });
-           
-           return false;
-    });
-
-    $("#query1DeleteButton").click(function() {
-    
-           var job_id = $("input#job_id").val();
-           
-           var jsonData = {
-               command : 'delete',
-               query : job_id 
-           };
-    
-           sendQuery(jsonData, function() {
-               $('#query1SubmitConfirm').html("<div id='message'></div>");
-               $('#message').html("<h2>Add " + phone + " submitted to 
application!</h2>")
-               .append("<p>Result will appear on page shortly.</p>");
-            });
-           
-           return false;
-    });
-
-    function sendQuery(jsonData, callback) {
-        var ws = new WebSocket('ws://'+window.location.host+'/pubsub');
-
-        ws.onopen = function () {
-          var topic = "contrib.summit.mrDebugger.mrDebuggerQuery";  
-          var msg = JSON.stringify({ "type" : "publish", "topic" : topic, 
"data" : jsonData });
-          ws.send(msg);
-          console.log("published to: " + topic + " data: " + msg);
-          ws.close();
-          if (callback) callback();
-        };
-
-        ws.onerror = function (error) {
-          console.log('WebSocket Error ' + error);
-        };
-
-        ws.onmessage = function (e) {
-          console.log('Server: ' + e.data);
-        };
-        ws.onclose = function (e) {
-            console.log('close: ' , e);
-        };
-
-    }
-
-    var ws = new WebSocket('ws://'+window.location.host+'/pubsub');
-    var topic = "contrib.summit.mrDebugger.jobResult";  
-
-    ws.onopen = function () {
-      var msg = JSON.stringify({ "type":"subscribe", "topic": topic});
-      console.log("sending: " + msg);
-      ws.send(msg);
-    };
-
-    ws.onerror = function (error) {
-      console.log('WebSocket Error ' + error);
-    };
-
-    ws.onmessage = function (e){ 
-
-       $('#jobQueryResult').append(e.data+"\n");       
-    };      
-    
-
-    var mapws = new WebSocket('ws://'+window.location.host+'/pubsub');
-    var maptopic = "contrib.summit.mrDebugger.mapResult";  
-
-    mapws.onopen = function () {
-      var msg = JSON.stringify({ "type":"subscribe", "topic": maptopic});
-      console.log("sending: " + msg);
-      mapws.send(msg);
-    };
-
-    mapws.onerror = function (error) {
-      console.log('WebSocket Error ' + error);
-    };
-
-    mapws.onmessage = function (e){ 
-
-       $('#jobMapQueryResult').append(e.data+"\n");    
-    };      
-
-
-    var reducews = new WebSocket('ws://'+window.location.host+'/pubsub');
-    var reducetopic = "contrib.summit.mrDebugger.reduceResult";  
-
-    reducews.onopen = function () {
-      var msg = JSON.stringify({ "type":"subscribe", "topic": reducetopic});
-      console.log("sending: " + msg);
-      reducews.send(msg);
-    };
-
-    reducews.onerror = function (error) {
-      console.log('WebSocket Error ' + error);
-    };
-
-    reducews.onmessage = function (e){ 
-
-       $('#jobReduceQueryResult').append(e.data+"\n"); 
-    };      
-    
-  });
-
-
-</script>
-
-
-<div id="query1FormDiv">
-                 <form name="query1" action="">
-                   <p>
-                     <label for="phone" id="app_id_label">Application 
Id</label>
-                     <input type="text" name="app_id" id="app_id" size="30" 
value="" class="phone-input" />
-                   </p>
-                   <p>
-                     <label for="phone" id="job_id_label">Job Id</label>
-                     <input type="text" name="job_id" id="job_id" size="30" 
value="" class="phone-input" />
-                   </p>
-                   <p>
-                     <label for="phone" id="hostname_label">Hostname</label>
-                     <input type="text" name="hostname" id="hostname" 
size="30" value="" class="phone-input" />
-                   </p>
-                   <p>
-                     <label for="phone" id="rm_port_label">RM port</label>
-                     <input type="text" name="rm_port" id="rm_port" size="30" 
value="" class="phone-input" />
-                   </p>
-                   <p>
-                     <label for="phone" id="hs_port_label">History Server 
port</label>
-                     <input type="text" name="hs_port" id="hs_port" size="30" 
value="" class="phone-input" />
-                   </p>
-                   <p>
-                     <label for="phone" id="hadoop_version_label">Hadoop 
Version</label>
-                     <input type="text" name="hadoop_version" 
id="hadoop_version" size="30" value="" class="phone-input" />
-                   </p>
-                   <p>
-                     <label for="phone" id="api_version_label">API 
Version</label>
-                     <input type="text" name="api_version" id="api_version" 
size="30" value="" class="phone-input" />
-                   </p>
-            <p>
-                     <input type="submit" name="command" class="button" 
id="query1AddButton" value="Add" />
-                     <input type="submit" name="command" class="button" 
id="query1DeleteButton" value="Delete" />
-                     <input type="submit" name="command" class="button" 
id="query1ClearButton" value="Clear" />
-            </p>
-                 </form>
-                 <div id="query1SubmitConfirm"></div>
-                 <div>Job: <span id="jobQueryResult"></span></div>
-                 <div>Map Task: <span id="jobMapQueryResult"></span></div>
-                 <div>Reduce Job: <span id="jobReduceQueryResult"></span></div>
-               </div>
-
-</body>
-</html>
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/site/conf/my-app-conf1.xml
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/site/conf/my-app-conf1.xml 
b/demos/mrmonitor/src/site/conf/my-app-conf1.xml
deleted file mode 100644
index f35873b..0000000
--- a/demos/mrmonitor/src/site/conf/my-app-conf1.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<configuration>
-  <property>
-    <name>dt.attr.MASTER_MEMORY_MB</name>
-    <value>1024</value>
-  </property>
-</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java
----------------------------------------------------------------------
diff --git 
a/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java
 
b/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java
deleted file mode 100644
index ad8de02..0000000
--- 
a/demos/mrmonitor/src/test/java/com/datatorrent/demos/mrmonitor/MrMonitoringApplicationTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mrmonitor;
-
-import javax.servlet.Servlet;
-
-import org.eclipse.jetty.server.Connector;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.junit.Test;
-
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.lib.helper.SamplePubSubWebSocketServlet;
-
-/**
- * <p>MapReduceDebuggerApplicationTest class.</p>
- *
- * @since 0.3.4
- */
-
-public class MrMonitoringApplicationTest
-{
-
-  @Test
-  public void testApplication() throws Exception
-  {
-    Configuration conf = new Configuration(false);
-    conf.addResource("dt-site-monitoring.xml");
-    Server server = new Server(0);
-    Servlet servlet = new SamplePubSubWebSocketServlet();
-    ServletHolder sh = new ServletHolder(servlet);
-    ServletContextHandler contextHandler = new ServletContextHandler(server, 
"/", ServletContextHandler.SESSIONS);
-    contextHandler.addServlet(sh, "/pubsub");
-    contextHandler.addServlet(sh, "/*");
-    server.start();
-    Connector[] connector = server.getConnectors();
-    conf.set("dt.attr.GATEWAY_CONNECT_ADDRESS", "localhost:" + 
connector[0].getLocalPort());
-
-    MRMonitoringApplication application = new MRMonitoringApplication();
-    LocalMode lma = LocalMode.newInstance();
-    lma.prepareDAG(application, conf);
-    LocalMode.Controller lc = lma.getController();
-    lc.run(10000);
-    server.stop();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/test/resources/dt-site-monitoring.xml
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/test/resources/dt-site-monitoring.xml 
b/demos/mrmonitor/src/test/resources/dt-site-monitoring.xml
deleted file mode 100644
index d1e12c6..0000000
--- a/demos/mrmonitor/src/test/resources/dt-site-monitoring.xml
+++ /dev/null
@@ -1,63 +0,0 @@
-<?xml version="1.0"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<configuration>
-  <property>
-    
<name>dt.application.MRMonitoringDemo.operator.JobMonitor.attr.INITIAL_PARTITION_COUNT</name>
-    <value>1</value>
-  </property>
-  <property>
-    
<name>dt.application.MRMonitoringDemo.operator.JobMonitor.attr.APPLICATION_WINDOW_COUNT</name>
-    <value>4</value>
-  </property>
-  <property>
-    
<name>dt.application.MRMonitoringDemo.operator.JobMonitor.prop.maxJobs</name>
-    <value>25</value>
-  </property>
-  <property>
-    
<name>dt.application.MRMonitoringDemo.operator.JobMonitor.prop.maxJobs</name>
-    <value>25</value>
-  </property>
-  <property>
-    <name>dt.application.MRMonitoringDemo.operator.Query.prop.topic</name>
-    <value>contrib.summit.mrDebugger.mrDebuggerQuery</value>
-  </property>
-  <property>
-    <name>dt.application.MRMonitoringDemo.operator.JobOutput.prop.topic</name>
-    <value>contrib.summit.mrDebugger.jobResult</value>
-  </property>
-  <property>
-    <name>dt.application.MRMonitoringDemo.operator.MapJob.prop.topic</name>
-    <value>contrib.summit.mrDebugger.mapResult</value>
-  </property>
-  <property>
-    <name>dt.application.MRMonitoringDemo.operator.ReduceJob.prop.topic</name>
-    <value>contrib.summit.mrDebugger.reduceResult</value>
-  </property>
-  <property>
-    <name>dt.application.MRMonitoringDemo.operator.JobCounter.prop.topic</name>
-    <value>contrib.summit.mrDebugger.counterResult</value>
-  </property>
-  <property>
-    
<name>dt.application.MRMonitoringDemo.stream.QueryConversion.locality</name>
-    <value>CONTAINER_LOCAL</value>
-  </property>
-</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mrmonitor/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/demos/mrmonitor/src/test/resources/log4j.properties 
b/demos/mrmonitor/src/test/resources/log4j.properties
deleted file mode 100644
index cf0d19e..0000000
--- a/demos/mrmonitor/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,43 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-log4j.rootLogger=DEBUG,CONSOLE
-
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M 
- %m%n
-log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
-test.log.console.threshold=DEBUG
-
-log4j.appender.RFA=org.apache.log4j.RollingFileAppender
-log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
-log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - 
%m%n
-log4j.appender.RFA.File=/tmp/app.log
-
-# to enable, add SYSLOG to rootLogger
-log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
-log4j.appender.SYSLOG.syslogHost=127.0.0.1
-log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
-log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - 
%m%n
-log4j.appender.SYSLOG.Facility=LOCAL1
-
-log4j.logger.org=info
-#log4j.logger.org.apache.commons.beanutils=warn
-log4j.logger.com.datatorrent=debug
-log4j.logger.org.apache.apex=debug

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/pom.xml
----------------------------------------------------------------------
diff --git a/demos/mroperator/pom.xml b/demos/mroperator/pom.xml
deleted file mode 100644
index fcc30ff..0000000
--- a/demos/mroperator/pom.xml
+++ /dev/null
@@ -1,56 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-  
-  <artifactId>mroperator</artifactId>
-  <packaging>jar</packaging>
-
-  <name>Apache Apex Malhar MR Operator Demo</name>
-  <description></description>
-
-  <parent>
-    <groupId>org.apache.apex</groupId>
-    <artifactId>malhar-demos</artifactId>
-    <version>3.7.0-SNAPSHOT</version>
-  </parent>
-
-  <properties>
-    <skipTests>true</skipTests>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-mapreduce-client-core</artifactId>
-      <version>${hadoop.version}</version>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>*</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-  </dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/demos/mroperator/src/assemble/appPackage.xml 
b/demos/mroperator/src/assemble/appPackage.xml
deleted file mode 100644
index 4138cf2..0000000
--- a/demos/mroperator/src/assemble/appPackage.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
--->
-<assembly 
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2";
-    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-    
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
 http://maven.apache.org/xsd/assembly-1.1.2.xsd";>
-  <id>appPackage</id>
-  <formats>
-    <format>jar</format>
-  </formats>
-  <includeBaseDirectory>false</includeBaseDirectory>
-  <fileSets>
-    <fileSet>
-      <directory>${basedir}/target/</directory>
-      <outputDirectory>/app</outputDirectory>
-      <includes>
-        <include>${project.artifactId}-${project.version}.jar</include>
-      </includes>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/target/deps</directory>
-      <outputDirectory>/lib</outputDirectory>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/src/site/conf</directory>
-      <outputDirectory>/conf</outputDirectory>
-      <includes>
-        <include>*.xml</include>
-      </includes>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/src/main/resources/META-INF</directory>
-      <outputDirectory>/META-INF</outputDirectory>
-    </fileSet>
-    <fileSet>
-      <directory>${basedir}/src/main/resources/app</directory>
-      <outputDirectory>/app</outputDirectory>
-    </fileSet>
-  </fileSets>
-
-</assembly>
-

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java
----------------------------------------------------------------------
diff --git 
a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java
 
b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java
deleted file mode 100644
index 5dbd83f..0000000
--- 
a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/DateWritable.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mroperator;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * <p>DateWritable class.</p>
- *
- * @since 0.9.0
- */
-public class DateWritable implements WritableComparable<DateWritable>
-{
-  private static final SimpleDateFormat formatter = new SimpleDateFormat( 
"yyyy-MM-dd' T 'HH:mm:ss.SSS" );
-  private Date date;
-
-  public Date getDate()
-  {
-    return date;
-  }
-
-  public void setDate( Date date )
-  {
-    this.date = date;
-  }
-
-  public void readFields( DataInput in ) throws IOException
-  {
-    date = new Date( in.readLong() );
-  }
-
-  public void write( DataOutput out ) throws IOException
-  {
-    out.writeLong( date.getTime() );
-  }
-
-  @Override
-  public boolean equals(Object o)
-  {
-    return toString().equals(o.toString());
-  }
-
-  @Override
-  public int hashCode()
-  {
-    return toString().hashCode();
-  }
-
-  public String toString()
-  {
-    return formatter.format( date);
-  }
-
-  public int compareTo( DateWritable other )
-  {
-    return date.compareTo( other.getDate() );
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java
 
b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java
deleted file mode 100644
index c4b9c49..0000000
--- 
a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/HdfsKeyValOutputOperator.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mroperator;
-
-import com.datatorrent.lib.io.fs.AbstractSingleFileOutputOperator;
-import com.datatorrent.lib.util.KeyHashValPair;
-
-/**
- * Adapter for writing KeyHashValPair objects to HDFS
- * <p>
- * Serializes tuples into a HDFS file.<br/>
- * </p>
- *
- * @param <K> Key type
- * @param <V> Value type
- * @since 0.9.4
- */
-public class HdfsKeyValOutputOperator<K, V> extends 
AbstractSingleFileOutputOperator<KeyHashValPair<K, V>>
-{
-  @Override
-  public byte[] getBytesForTuple(KeyHashValPair<K,V> t)
-  {
-    return (t.toString() + "\n").getBytes();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java
----------------------------------------------------------------------
diff --git 
a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java
 
b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java
deleted file mode 100644
index 076b8ac..0000000
--- 
a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/InvertedIndexApplication.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mroperator;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
-/**
- * <p>InvertedIndexApplication class.</p>
- *
- * @since 0.9.0
- */
-@ApplicationAnnotation(name = "InvertedIndexDemo")
-public class InvertedIndexApplication extends 
MapReduceApplication<LongWritable, Text, Text, Text>
-{
-
-  InvertedIndexApplication()
-  {
-    setMapClass(LineIndexer.LineIndexMapper.class);
-    setReduceClass(LineIndexer.LineIndexReducer.class);
-
-    setInputFormat(TextInputFormat.class);
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java
----------------------------------------------------------------------
diff --git 
a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java
 
b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java
deleted file mode 100644
index e963954..0000000
--- 
a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LineIndexer.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mroperator;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.StringTokenizer;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * <p>LineIndexer class.</p>
- *
- * @since 0.9.0
- */
-public class LineIndexer
-{
-
-  public static class LineIndexMapper extends MapReduceBase
-      implements Mapper<LongWritable, Text, Text, Text>
-  {
-    private static final Text word = new Text();
-    private static final Text location = new Text();
-
-    public void map(LongWritable key, Text val,
-        OutputCollector<Text, Text> output, Reporter reporter) throws 
IOException
-    {
-      FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
-      String fileName = fileSplit.getPath().getName();
-      location.set(fileName);
-
-      String line = val.toString();
-      StringTokenizer itr = new StringTokenizer(line.toLowerCase());
-      while (itr.hasMoreTokens()) {
-        word.set(itr.nextToken());
-        output.collect(word, location);
-      }
-    }
-  }
-
-
-
-  public static class LineIndexReducer extends MapReduceBase
-      implements Reducer<Text, Text, Text, Text>
-  {
-    public void reduce(Text key, Iterator<Text> values,
-        OutputCollector<Text, Text> output, Reporter reporter) throws 
IOException
-    {
-      boolean first = true;
-      StringBuilder toReturn = new StringBuilder();
-      while (values.hasNext()) {
-        if (!first) {
-          toReturn.append(", ");
-        }
-        first = false;
-        toReturn.append(values.next().toString());
-      }
-
-      output.collect(key, new Text(toReturn.toString()));
-    }
-  }
-
-
-  /**
-   * The actual main() method for our program; this is the
-   * "driver" for the MapReduce job.
-   */
-  public static void main(String[] args)
-  {
-    JobClient client = new JobClient();
-    JobConf conf = new JobConf(LineIndexer.class);
-
-    conf.setJobName("LineIndexer");
-
-    conf.setOutputKeyClass(Text.class);
-    conf.setOutputValueClass(Text.class);
-
-    FileInputFormat.addInputPath(conf, new Path("input"));
-    FileOutputFormat.setOutputPath(conf, new Path("output"));
-
-    conf.setMapperClass(LineIndexMapper.class);
-    conf.setReducerClass(LineIndexReducer.class);
-
-    client.setConf(conf);
-
-    try {
-      JobClient.runJob(conf);
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java
----------------------------------------------------------------------
diff --git 
a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java
 
b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java
deleted file mode 100644
index 69ee892..0000000
--- 
a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogCountsPerHour.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mroperator;
-
-import java.io.IOException;
-import java.util.Calendar;
-import java.util.Iterator;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * <p>LogCountsPerHour class.</p>
- *
- * @since 0.9.0
- */
-public class LogCountsPerHour extends Configured implements Tool
-{
-
-  public static class LogMapClass extends MapReduceBase
-      implements Mapper<LongWritable, Text, DateWritable, IntWritable>
-  {
-    private DateWritable date = new DateWritable();
-    private static final IntWritable one = new IntWritable(1);
-
-    public void map(LongWritable key, Text value, 
OutputCollector<DateWritable, IntWritable> output, Reporter reporter) throws 
IOException
-    {
-      // Get the value as a String; it is of the format:
-      // 111.111.111.111 - - [16/Dec/2012:05:32:50 -0500] "GET / HTTP/1.1" 200 
14791 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; 
+http://www.baidu.com/search/spider.html)"
-      String text = value.toString();
-
-      // Get the date and time
-      int openBracket = text.indexOf('[');
-      int closeBracket = text.indexOf(']');
-      if (openBracket != -1 && closeBracket != -1) {
-        // Read the date
-        String dateString = text.substring(text.indexOf('[') + 1, 
text.indexOf(']'));
-
-        // Build a date object from a string of the form: 16/Dec/2012:05:32:50 
-0500
-        int index = 0;
-        int nextIndex = dateString.indexOf('/');
-        int day = Integer.parseInt(dateString.substring(index, nextIndex));
-
-        index = nextIndex;
-        nextIndex = dateString.indexOf('/', index + 1);
-        String month = dateString.substring(index + 1, nextIndex);
-
-        index = nextIndex;
-        nextIndex = dateString.indexOf(':', index);
-        int year = Integer.parseInt(dateString.substring(index + 1, 
nextIndex));
-
-        index = nextIndex;
-        nextIndex = dateString.indexOf(':', index + 1);
-        int hour = Integer.parseInt(dateString.substring(index + 1, 
nextIndex));
-
-        // Build a calendar object for this date
-        Calendar calendar = Calendar.getInstance();
-        calendar.set(Calendar.DATE, day);
-        calendar.set(Calendar.YEAR, year);
-        calendar.set(Calendar.HOUR, hour);
-        calendar.set(Calendar.MINUTE, 0);
-        calendar.set(Calendar.SECOND, 0);
-        calendar.set(Calendar.MILLISECOND, 0);
-
-        if (month.equalsIgnoreCase("dec")) {
-          calendar.set(Calendar.MONTH, Calendar.DECEMBER);
-        } else if (month.equalsIgnoreCase("nov")) {
-          calendar.set(Calendar.MONTH, Calendar.NOVEMBER);
-        } else if (month.equalsIgnoreCase("oct")) {
-          calendar.set(Calendar.MONTH, Calendar.OCTOBER);
-        } else if (month.equalsIgnoreCase("sep")) {
-          calendar.set(Calendar.MONTH, Calendar.SEPTEMBER);
-        } else if (month.equalsIgnoreCase("aug")) {
-          calendar.set(Calendar.MONTH, Calendar.AUGUST);
-        } else if (month.equalsIgnoreCase("jul")) {
-          calendar.set(Calendar.MONTH, Calendar.JULY);
-        } else if (month.equalsIgnoreCase("jun")) {
-          calendar.set(Calendar.MONTH, Calendar.JUNE);
-        } else if (month.equalsIgnoreCase("may")) {
-          calendar.set(Calendar.MONTH, Calendar.MAY);
-        } else if (month.equalsIgnoreCase("apr")) {
-          calendar.set(Calendar.MONTH, Calendar.APRIL);
-        } else if (month.equalsIgnoreCase("mar")) {
-          calendar.set(Calendar.MONTH, Calendar.MARCH);
-        } else if (month.equalsIgnoreCase("feb")) {
-          calendar.set(Calendar.MONTH, Calendar.FEBRUARY);
-        } else if (month.equalsIgnoreCase("jan")) {
-          calendar.set(Calendar.MONTH, Calendar.JANUARY);
-        }
-
-
-        // Output the date as the key and 1 as the value
-        date.setDate(calendar.getTime());
-        output.collect(date, one);
-      }
-    }
-  }
-
-  public static class LogReduce extends MapReduceBase
-      implements Reducer<DateWritable, IntWritable, DateWritable, IntWritable>
-  {
-    public void reduce(DateWritable key, Iterator<IntWritable> values, 
OutputCollector<DateWritable, IntWritable> output, Reporter reporter) throws 
IOException
-    {
-      // Iterate over all of the values (counts of occurrences of this word)
-      int count = 0;
-      while (values.hasNext()) {
-        // Add the value to our count
-        count += values.next().get();
-      }
-
-      // Output the word with its count (wrapped in an IntWritable)
-      output.collect(key, new IntWritable(count));
-    }
-  }
-
-
-  public int run(String[] args) throws Exception
-  {
-    // Create a configuration
-    Configuration conf = getConf();
-
-    // Create a job from the default configuration that will use the WordCount 
class
-    JobConf job = new JobConf(conf, LogCountsPerHour.class);
-
-    // Define our input path as the first command line argument and our output 
path as the second
-    Path in = new Path(args[0]);
-    Path out = new Path(args[1]);
-
-    // Create File Input/Output formats for these paths (in the job)
-    FileInputFormat.setInputPaths(job, in);
-    FileOutputFormat.setOutputPath(job, out);
-
-    // Configure the job: name, mapper, reducer, and combiner
-    job.setJobName("LogAveragePerHour");
-    job.setMapperClass(LogMapClass.class);
-    job.setReducerClass(LogReduce.class);
-    job.setCombinerClass(LogReduce.class);
-
-    // Configure the output
-    job.setOutputFormat(TextOutputFormat.class);
-    job.setOutputKeyClass(DateWritable.class);
-    job.setOutputValueClass(IntWritable.class);
-
-    // Run the job
-    JobClient.runJob(job);
-    return 0;
-  }
-
-  public static void main(String[] args) throws Exception
-  {
-    // Start the LogCountsPerHour MapReduce application
-    int res = ToolRunner.run(new Configuration(), new LogCountsPerHour(), 
args);
-    System.exit(res);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java
----------------------------------------------------------------------
diff --git 
a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java
 
b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java
deleted file mode 100644
index 2d647ed..0000000
--- 
a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/LogsCountApplication.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mroperator;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
-/**
- * <p>LogsCountApplication class.</p>
- *
- * @since 0.9.0
- */
-@ApplicationAnnotation(name = "LogsCountDemo")
-public class LogsCountApplication extends MapReduceApplication<LongWritable, 
Text, DateWritable, IntWritable>
-{
-
-  public void LogsCountApplication()
-  {
-    setMapClass(LogCountsPerHour.LogMapClass.class);
-    // setCombineClass(LogCountsPerHour.LogReduce.class);
-    setReduceClass(LogCountsPerHour.LogReduce.class);
-    setInputFormat(TextInputFormat.class);
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
----------------------------------------------------------------------
diff --git 
a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
 
b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
deleted file mode 100644
index 509f6ae..0000000
--- 
a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mroperator;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import javax.validation.constraints.Min;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.KeyValueTextInputFormat;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.DefaultPartition;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Partitioner;
-
-import com.datatorrent.demos.mroperator.ReporterImpl.ReporterType;
-import com.datatorrent.lib.util.KeyHashValPair;
-
-/**
- * <p>
- * MapOperator class.
- * </p>
- *
- * @since 0.9.0
- */
-@SuppressWarnings({ "unchecked"})
-public class MapOperator<K1, V1, K2, V2>  implements InputOperator, 
Partitioner<MapOperator<K1, V1, K2, V2>>
-{
-
-  private static final Logger logger = 
LoggerFactory.getLogger(MapOperator.class);
-  private String dirName;
-  private boolean emitPartitioningCountOnce = false;
-  private boolean emitLastCountOnce = false;
-  private int operatorId;
-  private Class<? extends InputFormat<K1, V1>> inputFormatClass;
-  private transient InputFormat<K1, V1> inputFormat;
-  private transient InputSplit inputSplit;
-  private Class<? extends InputSplit> inputSplitClass;
-  private ByteArrayOutputStream outstream = new ByteArrayOutputStream();
-  private transient RecordReader<K1, V1> reader;
-  private boolean emittedAll = false;
-  public final transient DefaultOutputPort<KeyHashValPair<Integer, Integer>> 
outputCount = new DefaultOutputPort<KeyHashValPair<Integer, Integer>>();
-  public final transient DefaultOutputPort<KeyHashValPair<K2, V2>> output = 
new DefaultOutputPort<KeyHashValPair<K2, V2>>();
-  private transient JobConf jobConf;
-  @Min(1)
-  private int partitionCount = 1;
-
-  public Class<? extends InputSplit> getInputSplitClass()
-  {
-    return inputSplitClass;
-  }
-
-  public void setInputSplitClass(Class<? extends InputSplit> inputSplitClass)
-  {
-    this.inputSplitClass = inputSplitClass;
-  }
-
-  public Class<? extends InputFormat<K1, V1>> getInputFormatClass()
-  {
-    return inputFormatClass;
-  }
-
-  public void setInputFormatClass(Class<? extends InputFormat<K1, V1>> 
inputFormatClass)
-  {
-    this.inputFormatClass = inputFormatClass;
-  }
-
-  public String getDirName()
-  {
-    return dirName;
-  }
-
-  public void setDirName(String dirName)
-  {
-    this.dirName = dirName;
-  }
-
-  public int getPartitionCount()
-  {
-    return partitionCount;
-  }
-
-  public void setPartitionCount(int partitionCount)
-  {
-    this.partitionCount = partitionCount;
-  }
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-    if (!emitPartitioningCountOnce) {
-      outputCount.emit(new KeyHashValPair<Integer, Integer>(operatorId, 1));
-      emitPartitioningCountOnce = true;
-    }
-    if (reader == null) {
-      try {
-        reader = inputFormat.getRecordReader(inputSplit, new JobConf(new 
Configuration()), reporter);
-      } catch (IOException e) {
-        logger.info("error getting record reader {}", e.getMessage());
-      }
-    }
-  }
-
-  @Override
-  public void teardown()
-  {
-
-  }
-
-  @Override
-  public void setup(OperatorContext context)
-  {
-    if (context != null) {
-      operatorId = context.getId();
-    }
-    reporter = new ReporterImpl(ReporterType.Mapper, new Counters());
-    outputCollector = new OutputCollectorImpl<K2, V2>();
-    Configuration conf = new Configuration();
-    try {
-      inputFormat = inputFormatClass.newInstance();
-      SerializationFactory serializationFactory = new 
SerializationFactory(conf);
-      Deserializer keyDesiralizer = 
serializationFactory.getDeserializer(inputSplitClass);
-      keyDesiralizer.open(new ByteArrayInputStream(outstream.toByteArray()));
-      inputSplit = (InputSplit)keyDesiralizer.deserialize(null);
-      ((ReporterImpl)reporter).setInputSplit(inputSplit);
-      reader = inputFormat.getRecordReader(inputSplit, new JobConf(conf), 
reporter);
-    } catch (Exception e) {
-      logger.info("failed to initialize inputformat obj {}", inputFormat);
-      throw new RuntimeException(e);
-    }
-    InputStream stream = null;
-    if (configFile != null && configFile.length() > 0) {
-      stream = ClassLoader.getSystemResourceAsStream("/" + configFile);
-      if (stream == null) {
-        stream = ClassLoader.getSystemResourceAsStream(configFile);
-      }
-    }
-    if (stream != null) {
-      conf.addResource(stream);
-    }
-    jobConf = new JobConf(conf);
-    if (mapClass != null) {
-      try {
-        mapObject = mapClass.newInstance();
-      } catch (Exception e) {
-        logger.info("can't instantiate object {}", e.getMessage());
-      }
-
-      mapObject.configure(jobConf);
-    }
-    if (combineClass != null) {
-      try {
-        combineObject = combineClass.newInstance();
-      } catch (Exception e) {
-        logger.info("can't instantiate object {}", e.getMessage());
-      }
-      combineObject.configure(jobConf);
-    }
-  }
-
-  @Override
-  public void emitTuples()
-  {
-    if (!emittedAll) {
-      try {
-        K1 key = reader.createKey();
-        V1 val = reader.createValue();
-        emittedAll = !reader.next(key, val);
-        if (!emittedAll) {
-          KeyHashValPair<K1, V1> keyValue = new KeyHashValPair<K1, V1>(key, 
val);
-          mapObject.map(keyValue.getKey(), keyValue.getValue(), 
outputCollector, reporter);
-          if (combineObject == null) {
-            List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, 
V2>)outputCollector).getList();
-            for (KeyHashValPair<K2, V2> e : list) {
-              output.emit(e);
-            }
-            list.clear();
-          }
-        }
-      } catch (IOException ex) {
-        logger.debug(ex.toString());
-        throw new RuntimeException(ex);
-      }
-    }
-  }
-
-  @Override
-  public void endWindow()
-  {
-    List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, 
V2>)outputCollector).getList();
-    if (combineObject != null) {
-      Map<K2, List<V2>> cacheObject = new HashMap<K2, List<V2>>();
-      for (KeyHashValPair<K2, V2> tuple : list) {
-        List<V2> cacheList = cacheObject.get(tuple.getKey());
-        if (cacheList == null) {
-          cacheList = new ArrayList<V2>();
-          cacheList.add(tuple.getValue());
-          cacheObject.put(tuple.getKey(), cacheList);
-        } else {
-          cacheList.add(tuple.getValue());
-        }
-      }
-      list.clear();
-      OutputCollector<K2, V2> tempOutputCollector = new 
OutputCollectorImpl<K2, V2>();
-      for (Map.Entry<K2, List<V2>> e : cacheObject.entrySet()) {
-        try {
-          combineObject.reduce(e.getKey(), e.getValue().iterator(), 
tempOutputCollector, reporter);
-        } catch (IOException e1) {
-          logger.info(e1.getMessage());
-        }
-      }
-      list = ((OutputCollectorImpl<K2, V2>)tempOutputCollector).getList();
-      for (KeyHashValPair<K2, V2> e : list) {
-        output.emit(e);
-      }
-    }
-    if (!emitLastCountOnce && emittedAll) {
-      outputCount.emit(new KeyHashValPair<Integer, Integer>(operatorId, -1));
-      logger.info("emitting end of file {}", new KeyHashValPair<Integer, 
Integer>(operatorId, -1));
-      emitLastCountOnce = true;
-    }
-    list.clear();
-  }
-
-  private InputSplit[] getSplits(JobConf conf, int numSplits, String path) 
throws Exception
-  {
-    FileInputFormat.setInputPaths(conf, new Path(path));
-    if (inputFormat == null) {
-      inputFormat = inputFormatClass.newInstance();
-      String inputFormatClassName = inputFormatClass.getName();
-      if 
(inputFormatClassName.equals("org.apache.hadoop.mapred.TextInputFormat")) {
-        ((TextInputFormat)inputFormat).configure(conf);
-      } else if 
(inputFormatClassName.equals("org.apache.hadoop.mapred.KeyValueTextInputFormat"))
 {
-        ((KeyValueTextInputFormat)inputFormat).configure(conf);
-      }
-    }
-    return inputFormat.getSplits(conf, numSplits);
-    // return null;
-  }
-
-  @Override
-  public void partitioned(Map<Integer, Partition<MapOperator<K1, V1, K2, V2>>> 
partitions)
-  {
-  }
-
-  @SuppressWarnings("rawtypes")
-  @Override
-  public Collection<Partition<MapOperator<K1, V1, K2, V2>>> 
definePartitions(Collection<Partition<MapOperator<K1, V1, K2, V2>>> partitions, 
PartitioningContext context)
-  {
-    int tempPartitionCount = partitionCount;
-
-    Collection c = partitions;
-    Collection<Partition<MapOperator<K1, V1, K2, V2>>> operatorPartitions = c;
-    Partition<MapOperator<K1, V1, K2, V2>> template;
-    Iterator<Partition<MapOperator<K1, V1, K2, V2>>> itr = 
operatorPartitions.iterator();
-    template = itr.next();
-    Configuration conf = new Configuration();
-    SerializationFactory serializationFactory = new SerializationFactory(conf);
-    if (outstream.size() == 0) {
-      InputSplit[] splits;
-      try {
-        splits = getSplits(new JobConf(conf), tempPartitionCount, 
template.getPartitionedInstance().getDirName());
-      } catch (Exception e1) {
-        logger.info(" can't get splits {}", e1.getMessage());
-        throw new RuntimeException(e1);
-      }
-      Collection<Partition<MapOperator<K1, V1, K2, V2>>> operList = new 
ArrayList<Partition<MapOperator<K1, V1, K2, V2>>>();
-      itr = operatorPartitions.iterator();
-      int size = splits.length;
-      Serializer keySerializer = 
serializationFactory.getSerializer(splits[0].getClass());
-      while (size > 0 && itr.hasNext()) {
-        Partition<MapOperator<K1, V1, K2, V2>> p = itr.next();
-        MapOperator<K1, V1, K2, V2> opr = p.getPartitionedInstance();
-        opr.setInputFormatClass(inputFormatClass);
-        opr.setMapClass(mapClass);
-        opr.setCombineClass(combineClass);
-        opr.setConfigFile(configFile);
-        try {
-          keySerializer.open(opr.getOutstream());
-          keySerializer.serialize(splits[size - 1]);
-          opr.setInputSplitClass(splits[size - 1].getClass());
-        } catch (IOException e) {
-          logger.info("error while serializing {}", e.getMessage());
-        }
-        size--;
-        operList.add(p);
-      }
-      while (size > 0) {
-        MapOperator<K1, V1, K2, V2> opr = new MapOperator<K1, V1, K2, V2>();
-        opr.setInputFormatClass(inputFormatClass);
-        opr.setMapClass(mapClass);
-        opr.setCombineClass(combineClass);
-        opr.setConfigFile(configFile);
-        try {
-          keySerializer.open(opr.getOutstream());
-          keySerializer.serialize(splits[size - 1]);
-          opr.setInputSplitClass(splits[size - 1].getClass());
-        } catch (IOException e) {
-          logger.info("error while serializing {}", e.getMessage());
-        }
-        size--;
-        operList.add(new DefaultPartition<MapOperator<K1, V1, K2, V2>>(opr));
-      }
-      try {
-        keySerializer.close();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-      return operList;
-    }
-    return null;
-  }
-
-  public ByteArrayOutputStream getOutstream()
-  {
-    return outstream;
-  }
-
-  public void setOutstream(ByteArrayOutputStream outstream)
-  {
-    this.outstream = outstream;
-  }
-
-  /**
-   * adding map code
-   */
-
-  private Class<? extends Mapper<K1, V1, K2, V2>> mapClass;
-  private Class<? extends Reducer<K2, V2, K2, V2>> combineClass;
-
-  private transient Mapper<K1, V1, K2, V2> mapObject;
-  private transient Reducer<K2, V2, K2, V2> combineObject;
-  private transient Reporter reporter;
-
-  private String configFile;
-
-  public String getConfigFile()
-  {
-    return configFile;
-  }
-
-  public void setConfigFile(String configFile)
-  {
-    this.configFile = configFile;
-  }
-
-  private transient OutputCollector<K2, V2> outputCollector;
-
-  public Class<? extends Mapper<K1, V1, K2, V2>> getMapClass()
-  {
-    return mapClass;
-  }
-
-  public void setMapClass(Class<? extends Mapper<K1, V1, K2, V2>> mapClass)
-  {
-    this.mapClass = mapClass;
-  }
-
-  public Class<? extends Reducer<K2, V2, K2, V2>> getCombineClass()
-  {
-    return combineClass;
-  }
-
-  public void setCombineClass(Class<? extends Reducer<K2, V2, K2, V2>> 
combineClass)
-  {
-    this.combineClass = combineClass;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapReduceApplication.java
----------------------------------------------------------------------
diff --git 
a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapReduceApplication.java
 
b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapReduceApplication.java
deleted file mode 100644
index 9fffc3f..0000000
--- 
a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapReduceApplication.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mroperator;
-
-import java.util.StringTokenizer;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.Reducer;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
-
-/**
- * <p>
- * Abstract MapReduceApplication class.
- * </p>
- *
- * @since 0.9.0
- */
-@ApplicationAnnotation(name = "MapReduceDemo")
-public abstract class MapReduceApplication<K1, V1, K2, V2> implements 
StreamingApplication
-{
-  Class<? extends InputFormat<K1, V1>> inputFormat;
-  Class<? extends Mapper<K1, V1, K2, V2>> mapClass;
-  Class<? extends Reducer<K2, V2, K2, V2>> reduceClass;
-  Class<? extends Reducer<K2, V2, K2, V2>> combineClass;
-
-  public Class<? extends Reducer<K2, V2, K2, V2>> getCombineClass()
-  {
-    return combineClass;
-  }
-
-  public void setCombineClass(Class<? extends Reducer<K2, V2, K2, V2>> 
combineClass)
-  {
-    this.combineClass = combineClass;
-  }
-
-  public void setInputFormat(Class<? extends InputFormat<K1, V1>> inputFormat)
-  {
-    this.inputFormat = inputFormat;
-  }
-
-  public Class<? extends Mapper<K1, V1, K2, V2>> getMapClass()
-  {
-    return mapClass;
-  }
-
-  public void setMapClass(Class<? extends Mapper<K1, V1, K2, V2>> mapClass)
-  {
-    this.mapClass = mapClass;
-  }
-
-  public Class<? extends Reducer<K2, V2, K2, V2>> getReduceClass()
-  {
-    return reduceClass;
-  }
-
-  public void setReduceClass(Class<? extends Reducer<K2, V2, K2, V2>> 
reduceClass)
-  {
-    this.reduceClass = reduceClass;
-  }
-
-
-  @Override
-  public void populateDAG(DAG dag, Configuration conf)
-  {
-    String configurationFilePath = conf.get(this.getClass().getSimpleName() + 
".configFile", "");
-
-    MapOperator<K1, V1, K2, V2> inputOperator = dag.addOperator("Mapper", new 
MapOperator<K1, V1, K2, V2>());
-    inputOperator.setInputFormatClass(inputFormat);
-
-    String configFileName = null;
-    if (configurationFilePath != null && !configurationFilePath.isEmpty()) {
-      StringTokenizer configFileTokenizer = new 
StringTokenizer(configurationFilePath, "/");
-      configFileName = configFileTokenizer.nextToken();
-      while (configFileTokenizer.hasMoreTokens()) {
-        configFileName = configFileTokenizer.nextToken();
-      }
-    }
-
-    inputOperator.setMapClass(mapClass);
-    inputOperator.setConfigFile(configFileName);
-    inputOperator.setCombineClass(combineClass);
-
-    ReduceOperator<K2, V2, K2, V2> reduceOpr = dag.addOperator("Reducer", new 
ReduceOperator<K2, V2, K2, V2>());
-    reduceOpr.setReduceClass(reduceClass);
-    reduceOpr.setConfigFile(configFileName);
-
-    HdfsKeyValOutputOperator<K2, V2> console = dag.addOperator("Console", new 
HdfsKeyValOutputOperator<K2, V2>());
-
-    dag.addStream("Mapped-Output", inputOperator.output, reduceOpr.input);
-    dag.addStream("Mapper-Count", inputOperator.outputCount, 
reduceOpr.inputCount);
-    dag.addStream("Reduced-Output", reduceOpr.output, console.input);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java
----------------------------------------------------------------------
diff --git 
a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java
 
b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java
deleted file mode 100644
index b0ea7d8..0000000
--- 
a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/NewWordCountApplication.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.demos.mroperator;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-
-/**
- * <p>NewWordCountApplication class.</p>
- *
- * @since 0.9.0
- */
-@ApplicationAnnotation(name = "WordCountDemo")
-public class NewWordCountApplication extends 
MapReduceApplication<LongWritable, Text, Text, IntWritable>
-{
-
-  public void NewWordCountApplication()
-  {
-    setMapClass(WordCount.Map.class);
-    setReduceClass(WordCount.Reduce.class);
-    setCombineClass(WordCount.Reduce.class);
-    setInputFormat(TextInputFormat.class);
-  }
-}

Reply via email to