http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
new file mode 100644
index 0000000..41a371b
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineServerUtils;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Coprocessor for flow run table.
+ */
+public class FlowRunCoprocessor extends BaseRegionObserver {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FlowRunCoprocessor.class);
+
+  private Region region;
+  /**
+   * generate a timestamp that is unique per row in a region this is per 
region.
+   */
+  private final TimestampGenerator timestampGenerator =
+      new TimestampGenerator();
+
+  @Override
+  public void start(CoprocessorEnvironment e) throws IOException {
+    if (e instanceof RegionCoprocessorEnvironment) {
+      RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
+      this.region = env.getRegion();
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * This method adds the tags onto the cells in the Put. It is presumed that
+   * all the cells in one Put have the same set of Tags. The existing cell
+   * timestamp is overwritten for non-metric cells and each such cell gets a 
new
+   * unique timestamp generated by {@link TimestampGenerator}
+   *
+   * @see
+   * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#prePut(org.apache
+   * .hadoop.hbase.coprocessor.ObserverContext,
+   * org.apache.hadoop.hbase.client.Put,
+   * org.apache.hadoop.hbase.regionserver.wal.WALEdit,
+   * org.apache.hadoop.hbase.client.Durability)
+   */
+  @Override
+  public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put,
+      WALEdit edit, Durability durability) throws IOException {
+    Map<String, byte[]> attributes = put.getAttributesMap();
+    // Assumption is that all the cells in a put are the same operation.
+    List<Tag> tags = new ArrayList<>();
+    if ((attributes != null) && (attributes.size() > 0)) {
+      for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
+        Tag t = HBaseTimelineServerUtils.getTagFromAttribute(attribute);
+        if (t != null) {
+          tags.add(t);
+        }
+      }
+      byte[] tagByteArray = Tag.fromList(tags);
+      NavigableMap<byte[], List<Cell>> newFamilyMap = new TreeMap<>(
+          Bytes.BYTES_COMPARATOR);
+      for (Map.Entry<byte[], List<Cell>> entry : put.getFamilyCellMap()
+          .entrySet()) {
+        List<Cell> newCells = new ArrayList<>(entry.getValue().size());
+        for (Cell cell : entry.getValue()) {
+          // for each cell in the put add the tags
+          // Assumption is that all the cells in
+          // one put are the same operation
+          // also, get a unique cell timestamp for non-metric cells
+          // this way we don't inadvertently overwrite cell versions
+          long cellTimestamp = getCellTimestamp(cell.getTimestamp(), tags);
+          newCells.add(CellUtil.createCell(CellUtil.cloneRow(cell),
+              CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),
+              cellTimestamp, KeyValue.Type.Put, CellUtil.cloneValue(cell),
+              tagByteArray));
+        }
+        newFamilyMap.put(entry.getKey(), newCells);
+      } // for each entry
+      // Update the family map for the Put
+      put.setFamilyCellMap(newFamilyMap);
+    }
+  }
+
+  /**
+   * Determines if the current cell's timestamp is to be used or a new unique
+   * cell timestamp is to be used. The reason this is done is to inadvertently
+   * overwrite cells when writes come in very fast. But for metric cells, the
+   * cell timestamp signifies the metric timestamp. Hence we don't want to
+   * overwrite it.
+   *
+   * @param timestamp
+   * @param tags
+   * @return cell timestamp
+   */
+  private long getCellTimestamp(long timestamp, List<Tag> tags) {
+    // if ts not set (hbase sets to HConstants.LATEST_TIMESTAMP by default)
+    // then use the generator
+    if (timestamp == HConstants.LATEST_TIMESTAMP) {
+      return timestampGenerator.getUniqueTimestamp();
+    } else {
+      return timestamp;
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * Creates a {@link FlowScanner} Scan so that it can correctly process the
+   * contents of {@link FlowRunTable}.
+   *
+   * @see
+   * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preGetOp(org.apache
+   * .hadoop.hbase.coprocessor.ObserverContext,
+   * org.apache.hadoop.hbase.client.Get, java.util.List)
+   */
+  @Override
+  public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e,
+      Get get, List<Cell> results) throws IOException {
+    Scan scan = new Scan(get);
+    scan.setMaxVersions();
+    RegionScanner scanner = null;
+    try {
+      scanner = new FlowScanner(e.getEnvironment(), scan,
+          region.getScanner(scan), FlowScannerOperation.READ);
+      scanner.next(results);
+      e.bypass();
+    } finally {
+      if (scanner != null) {
+        scanner.close();
+      }
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * Ensures that max versions are set for the Scan so that metrics can be
+   * correctly aggregated and min/max can be correctly determined.
+   *
+   * @see
+   * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preScannerOpen(org
+   * .apache.hadoop.hbase.coprocessor.ObserverContext,
+   * org.apache.hadoop.hbase.client.Scan,
+   * org.apache.hadoop.hbase.regionserver.RegionScanner)
+   */
+  @Override
+  public RegionScanner preScannerOpen(
+      ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
+      RegionScanner scanner) throws IOException {
+    // set max versions for scan to see all
+    // versions to aggregate for metrics
+    scan.setMaxVersions();
+    return scanner;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * Creates a {@link FlowScanner} Scan so that it can correctly process the
+   * contents of {@link FlowRunTable}.
+   *
+   * @see
+   * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#postScannerOpen(
+   * org.apache.hadoop.hbase.coprocessor.ObserverContext,
+   * org.apache.hadoop.hbase.client.Scan,
+   * org.apache.hadoop.hbase.regionserver.RegionScanner)
+   */
+  @Override
+  public RegionScanner postScannerOpen(
+      ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
+      RegionScanner scanner) throws IOException {
+    return new FlowScanner(e.getEnvironment(), scan,
+        scanner, FlowScannerOperation.READ);
+  }
+
+  @Override
+  public InternalScanner preFlush(
+      ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+      InternalScanner scanner) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      if (store != null) {
+        LOG.debug("preFlush store = " + store.getColumnFamilyName()
+            + " flushableSize=" + store.getFlushableSize()
+            + " flushedCellsCount=" + store.getFlushedCellsCount()
+            + " compactedCellsCount=" + store.getCompactedCellsCount()
+            + " majorCompactedCellsCount="
+            + store.getMajorCompactedCellsCount() + " memstoreFlushSize="
+            + store.getMemstoreFlushSize() + " memstoreSize="
+            + store.getMemStoreSize() + " size=" + store.getSize()
+            + " storeFilesCount=" + store.getStorefilesCount());
+      }
+    }
+    return new FlowScanner(c.getEnvironment(), scanner,
+        FlowScannerOperation.FLUSH);
+  }
+
+  @Override
+  public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
+      Store store, StoreFile resultFile) {
+    if (LOG.isDebugEnabled()) {
+      if (store != null) {
+        LOG.debug("postFlush store = " + store.getColumnFamilyName()
+            + " flushableSize=" + store.getFlushableSize()
+            + " flushedCellsCount=" + store.getFlushedCellsCount()
+            + " compactedCellsCount=" + store.getCompactedCellsCount()
+            + " majorCompactedCellsCount="
+            + store.getMajorCompactedCellsCount() + " memstoreFlushSize="
+            + store.getMemstoreFlushSize() + " memstoreSize="
+            + store.getMemStoreSize() + " size=" + store.getSize()
+            + " storeFilesCount=" + store.getStorefilesCount());
+      }
+    }
+  }
+
+  @Override
+  public InternalScanner preCompact(
+      ObserverContext<RegionCoprocessorEnvironment> e, Store store,
+      InternalScanner scanner, ScanType scanType, CompactionRequest request)
+      throws IOException {
+
+    FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION;
+    if (request != null) {
+      requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
+          : FlowScannerOperation.MINOR_COMPACTION);
+      LOG.info("Compactionrequest= " + request.toString() + " "
+          + requestOp.toString() + " RegionName=" + e.getEnvironment()
+              .getRegion().getRegionInfo().getRegionNameAsString());
+    }
+    return new FlowScanner(e.getEnvironment(), scanner, requestOp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
new file mode 100644
index 0000000..7f09e51
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -0,0 +1,723 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineServerUtils;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invoked via the coprocessor when a Get or a Scan is issued for flow run
+ * table. Looks through the list of cells per row, checks their tags and does
+ * operation on those cells as per the cell tags. Transforms reads of the 
stored
+ * metrics into calculated sums for each column Also, finds the min and max for
+ * start and end times in a flow run.
+ */
+class FlowScanner implements RegionScanner, Closeable {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FlowScanner.class);
+
+  /**
+   * use a special application id to represent the flow id this is needed since
+   * TimestampGenerator parses the app id to generate a cell timestamp.
+   */
+  private static final String FLOW_APP_ID = "application_00000000000_0000";
+
+  private final Region region;
+  private final InternalScanner flowRunScanner;
+  private final int batchSize;
+  private final long appFinalValueRetentionThreshold;
+  private RegionScanner regionScanner;
+  private boolean hasMore;
+  private byte[] currentRow;
+  private List<Cell> availableCells = new ArrayList<>();
+  private int currentIndex;
+  private FlowScannerOperation action = FlowScannerOperation.READ;
+
+  FlowScanner(RegionCoprocessorEnvironment env, InternalScanner 
internalScanner,
+      FlowScannerOperation action) {
+    this(env, null, internalScanner, action);
+  }
+
+  FlowScanner(RegionCoprocessorEnvironment env, Scan incomingScan,
+      InternalScanner internalScanner, FlowScannerOperation action) {
+    this.batchSize = incomingScan == null ? -1 : incomingScan.getBatch();
+    // TODO initialize other scan attributes like Scan#maxResultSize
+    this.flowRunScanner = internalScanner;
+    if (internalScanner instanceof RegionScanner) {
+      this.regionScanner = (RegionScanner) internalScanner;
+    }
+    this.action = action;
+    if (env == null) {
+      this.appFinalValueRetentionThreshold =
+          YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD;
+      this.region = null;
+    } else {
+      this.region = env.getRegion();
+      Configuration hbaseConf = env.getConfiguration();
+      this.appFinalValueRetentionThreshold = hbaseConf.getLong(
+          YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD,
+          YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(" batch size=" + batchSize);
+    }
+  }
+
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo()
+   */
+  @Override
+  public HRegionInfo getRegionInfo() {
+    return region.getRegionInfo();
+  }
+
+  @Override
+  public boolean nextRaw(List<Cell> cells) throws IOException {
+    return nextRaw(cells, ScannerContext.newBuilder().build());
+  }
+
+  @Override
+  public boolean nextRaw(List<Cell> cells, ScannerContext scannerContext)
+      throws IOException {
+    return nextInternal(cells, scannerContext);
+  }
+
+  @Override
+  public boolean next(List<Cell> cells) throws IOException {
+    return next(cells, ScannerContext.newBuilder().build());
+  }
+
+  @Override
+  public boolean next(List<Cell> cells, ScannerContext scannerContext)
+      throws IOException {
+    return nextInternal(cells, scannerContext);
+  }
+
+  /**
+   * Get value converter associated with a column or a column prefix. If 
nothing
+   * matches, generic converter is returned.
+   * @param colQualifierBytes
+   * @return value converter implementation.
+   */
+  private static ValueConverter getValueConverter(byte[] colQualifierBytes) {
+    // Iterate over all the column prefixes for flow run table and get the
+    // appropriate converter for the column qualifier passed if prefix matches.
+    for (FlowRunColumnPrefix colPrefix : FlowRunColumnPrefix.values()) {
+      byte[] colPrefixBytes = colPrefix.getColumnPrefixBytes("");
+      if (Bytes.compareTo(colPrefixBytes, 0, colPrefixBytes.length,
+          colQualifierBytes, 0, colPrefixBytes.length) == 0) {
+        return colPrefix.getValueConverter();
+      }
+    }
+    // Iterate over all the columns for flow run table and get the
+    // appropriate converter for the column qualifier passed if match occurs.
+    for (FlowRunColumn column : FlowRunColumn.values()) {
+      if (Bytes.compareTo(
+          column.getColumnQualifierBytes(), colQualifierBytes) == 0) {
+        return column.getValueConverter();
+      }
+    }
+    // Return generic converter if nothing matches.
+    return GenericConverter.getInstance();
+  }
+
+  /**
+   * This method loops through the cells in a given row of the
+   * {@link FlowRunTable}. It looks at the tags of each cell to figure out how
+   * to process the contents. It then calculates the sum or min or max for each
+   * column or returns the cell as is.
+   *
+   * @param cells
+   * @param scannerContext
+   * @return true if next row is available for the scanner, false otherwise
+   * @throws IOException
+   */
+  private boolean nextInternal(List<Cell> cells, ScannerContext scannerContext)
+      throws IOException {
+    Cell cell = null;
+    startNext();
+    // Loop through all the cells in this row
+    // For min/max/metrics we do need to scan the entire set of cells to get 
the
+    // right one
+    // But with flush/compaction, the number of cells being scanned will go 
down
+    // cells are grouped per column qualifier then sorted by cell timestamp
+    // (latest to oldest) per column qualifier
+    // So all cells in one qualifier come one after the other before we see the
+    // next column qualifier
+    ByteArrayComparator comp = new ByteArrayComparator();
+    byte[] previousColumnQualifier = Separator.EMPTY_BYTES;
+    AggregationOperation currentAggOp = null;
+    SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
+    Set<String> alreadySeenAggDim = new HashSet<>();
+    int addedCnt = 0;
+    long currentTimestamp = System.currentTimeMillis();
+    ValueConverter converter = null;
+    int limit = batchSize;
+
+    while (limit <= 0 || addedCnt < limit) {
+      cell = peekAtNextCell(scannerContext);
+      if (cell == null) {
+        break;
+      }
+      byte[] currentColumnQualifier = CellUtil.cloneQualifier(cell);
+      if (previousColumnQualifier == null) {
+        // first time in loop
+        previousColumnQualifier = currentColumnQualifier;
+      }
+
+      converter = getValueConverter(currentColumnQualifier);
+      if (comp.compare(previousColumnQualifier, currentColumnQualifier) != 0) {
+        addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
+            converter, currentTimestamp);
+        resetState(currentColumnCells, alreadySeenAggDim);
+        previousColumnQualifier = currentColumnQualifier;
+        currentAggOp = getCurrentAggOp(cell);
+        converter = getValueConverter(currentColumnQualifier);
+      }
+      collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim,
+          converter, scannerContext);
+      nextCell(scannerContext);
+    }
+    if ((!currentColumnCells.isEmpty()) && ((limit <= 0 || addedCnt < limit))) 
{
+      addedCnt += emitCells(cells, currentColumnCells, currentAggOp, converter,
+          currentTimestamp);
+      if (LOG.isDebugEnabled()) {
+        if (addedCnt > 0) {
+          LOG.debug("emitted cells. " + addedCnt + " for " + this.action
+              + " rowKey="
+              + FlowRunRowKey.parseRowKey(CellUtil.cloneRow(cells.get(0))));
+        } else {
+          LOG.debug("emitted no cells for " + this.action);
+        }
+      }
+    }
+    return hasMore();
+  }
+
+  private AggregationOperation getCurrentAggOp(Cell cell) {
+    List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+        cell.getTagsLength());
+    // We assume that all the operations for a particular column are the same
+    return HBaseTimelineServerUtils.getAggregationOperationFromTagsList(tags);
+  }
+
+  /**
+   * resets the parameters to an initialized state for next loop iteration.
+   */
+  private void resetState(SortedSet<Cell> currentColumnCells,
+      Set<String> alreadySeenAggDim) {
+    currentColumnCells.clear();
+    alreadySeenAggDim.clear();
+  }
+
+  private void collectCells(SortedSet<Cell> currentColumnCells,
+      AggregationOperation currentAggOp, Cell cell,
+      Set<String> alreadySeenAggDim, ValueConverter converter,
+      ScannerContext scannerContext) throws IOException {
+
+    if (currentAggOp == null) {
+      // not a min/max/metric cell, so just return it as is
+      currentColumnCells.add(cell);
+      return;
+    }
+
+    switch (currentAggOp) {
+    case GLOBAL_MIN:
+      if (currentColumnCells.size() == 0) {
+        currentColumnCells.add(cell);
+      } else {
+        Cell currentMinCell = currentColumnCells.first();
+        Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp,
+            (NumericValueConverter) converter);
+        if (!currentMinCell.equals(newMinCell)) {
+          currentColumnCells.remove(currentMinCell);
+          currentColumnCells.add(newMinCell);
+        }
+      }
+      break;
+    case GLOBAL_MAX:
+      if (currentColumnCells.size() == 0) {
+        currentColumnCells.add(cell);
+      } else {
+        Cell currentMaxCell = currentColumnCells.first();
+        Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp,
+            (NumericValueConverter) converter);
+        if (!currentMaxCell.equals(newMaxCell)) {
+          currentColumnCells.remove(currentMaxCell);
+          currentColumnCells.add(newMaxCell);
+        }
+      }
+      break;
+    case SUM:
+    case SUM_FINAL:
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("In collect cells "
+            + " FlowSannerOperation="
+            + this.action
+            + " currentAggOp="
+            + currentAggOp
+            + " cell qualifier="
+            + Bytes.toString(CellUtil.cloneQualifier(cell))
+            + " cell value= "
+            + converter.decodeValue(CellUtil.cloneValue(cell))
+            + " timestamp=" + cell.getTimestamp());
+      }
+
+      // only if this app has not been seen yet, add to current column cells
+      List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+          cell.getTagsLength());
+      String aggDim = HBaseTimelineServerUtils
+          .getAggregationCompactionDimension(tags);
+      if (!alreadySeenAggDim.contains(aggDim)) {
+        // if this agg dimension has already been seen,
+        // since they show up in sorted order
+        // we drop the rest which are older
+        // in other words, this cell is older than previously seen cells
+        // for that agg dim
+        // but when this agg dim is not seen,
+        // consider this cell in our working set
+        currentColumnCells.add(cell);
+        alreadySeenAggDim.add(aggDim);
+      }
+      break;
+    default:
+      break;
+    } // end of switch case
+  }
+
+  /*
+   * Processes the cells in input param currentColumnCells and populates
+   * List<Cell> cells as the output based on the input AggregationOperation
+   * parameter.
+   */
+  private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
+      AggregationOperation currentAggOp, ValueConverter converter,
+      long currentTimestamp) throws IOException {
+    if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
+      return 0;
+    }
+    if (currentAggOp == null) {
+      cells.addAll(currentColumnCells);
+      return currentColumnCells.size();
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("In emitCells " + this.action + " currentColumnCells size= "
+          + currentColumnCells.size() + " currentAggOp" + currentAggOp);
+    }
+
+    switch (currentAggOp) {
+    case GLOBAL_MIN:
+    case GLOBAL_MAX:
+      cells.addAll(currentColumnCells);
+      return currentColumnCells.size();
+    case SUM:
+    case SUM_FINAL:
+      switch (action) {
+      case FLUSH:
+      case MINOR_COMPACTION:
+        cells.addAll(currentColumnCells);
+        return currentColumnCells.size();
+      case READ:
+        Cell sumCell = processSummation(currentColumnCells,
+            (NumericValueConverter) converter);
+        cells.add(sumCell);
+        return 1;
+      case MAJOR_COMPACTION:
+        List<Cell> finalCells = processSummationMajorCompaction(
+            currentColumnCells, (NumericValueConverter) converter,
+            currentTimestamp);
+        cells.addAll(finalCells);
+        return finalCells.size();
+      default:
+        cells.addAll(currentColumnCells);
+        return currentColumnCells.size();
+      }
+    default:
+      cells.addAll(currentColumnCells);
+      return currentColumnCells.size();
+    }
+  }
+
+  /*
+   * Returns a cell whose value is the sum of all cell values in the input set.
+   * The new cell created has the timestamp of the most recent metric cell. The
+   * sum of a metric for a flow run is the summation at the point of the last
+   * metric update in that flow till that time.
+   */
+  private Cell processSummation(SortedSet<Cell> currentColumnCells,
+      NumericValueConverter converter) throws IOException {
+    Number sum = 0;
+    Number currentValue = 0;
+    long ts = 0L;
+    long mostCurrentTimestamp = 0L;
+    Cell mostRecentCell = null;
+    for (Cell cell : currentColumnCells) {
+      currentValue = (Number) converter.decodeValue(CellUtil.cloneValue(cell));
+      ts = cell.getTimestamp();
+      if (mostCurrentTimestamp < ts) {
+        mostCurrentTimestamp = ts;
+        mostRecentCell = cell;
+      }
+      sum = converter.add(sum, currentValue);
+    }
+    byte[] sumBytes = converter.encodeValue(sum);
+    Cell sumCell =
+        HBaseTimelineServerUtils.createNewCell(mostRecentCell, sumBytes);
+    return sumCell;
+  }
+
+
+  /**
+   * Returns a list of cells that contains
+   *
+   * A) the latest cells for applications that haven't finished yet
+   * B) summation
+   * for the flow, based on applications that have completed and are older than
+   * a certain time
+   *
+   * The new cell created has the timestamp of the most recent metric cell. The
+   * sum of a metric for a flow run is the summation at the point of the last
+   * metric update in that flow till that time.
+   */
+  @VisibleForTesting
+  List<Cell> processSummationMajorCompaction(
+      SortedSet<Cell> currentColumnCells, NumericValueConverter converter,
+      long currentTimestamp)
+      throws IOException {
+    Number sum = 0;
+    Number currentValue = 0;
+    long ts = 0L;
+    boolean summationDone = false;
+    List<Cell> finalCells = new ArrayList<Cell>();
+    if (currentColumnCells == null) {
+      return finalCells;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("In processSummationMajorCompaction,"
+          + " will drop cells older than " + currentTimestamp
+          + " CurrentColumnCells size=" + currentColumnCells.size());
+    }
+
+    for (Cell cell : currentColumnCells) {
+      AggregationOperation cellAggOp = getCurrentAggOp(cell);
+      // if this is the existing flow sum cell
+      List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+          cell.getTagsLength());
+      String appId = HBaseTimelineServerUtils
+          .getAggregationCompactionDimension(tags);
+      if (appId == FLOW_APP_ID) {
+        sum = converter.add(sum, currentValue);
+        summationDone = true;
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("reading flow app id sum=" + sum);
+        }
+      } else {
+        currentValue = (Number) converter.decodeValue(CellUtil
+            .cloneValue(cell));
+        // read the timestamp truncated by the generator
+        ts =  TimestampGenerator.getTruncatedTimestamp(cell.getTimestamp());
+        if ((cellAggOp == AggregationOperation.SUM_FINAL)
+            && ((ts + this.appFinalValueRetentionThreshold)
+                < currentTimestamp)) {
+          sum = converter.add(sum, currentValue);
+          summationDone = true;
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("MAJOR COMPACTION loop sum= " + sum
+                + " discarding now: " + " qualifier="
+                + Bytes.toString(CellUtil.cloneQualifier(cell)) + " value="
+                + converter.decodeValue(CellUtil.cloneValue(cell))
+                + " timestamp=" + cell.getTimestamp() + " " + this.action);
+          }
+        } else {
+          // not a final value but it's the latest cell for this app
+          // so include this cell in the list of cells to write back
+          finalCells.add(cell);
+        }
+      }
+    }
+    if (summationDone) {
+      Cell anyCell = currentColumnCells.first();
+      List<Tag> tags = new ArrayList<Tag>();
+      Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+          Bytes.toBytes(FLOW_APP_ID));
+      tags.add(t);
+      t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(),
+          Bytes.toBytes(FLOW_APP_ID));
+      tags.add(t);
+      byte[] tagByteArray = Tag.fromList(tags);
+      Cell sumCell = HBaseTimelineServerUtils.createNewCell(
+          CellUtil.cloneRow(anyCell),
+          CellUtil.cloneFamily(anyCell),
+          CellUtil.cloneQualifier(anyCell),
+          TimestampGenerator.getSupplementedTimestamp(
+              System.currentTimeMillis(), FLOW_APP_ID),
+              converter.encodeValue(sum), tagByteArray);
+      finalCells.add(sumCell);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("MAJOR COMPACTION final sum= " + sum + " for "
+            + Bytes.toString(CellUtil.cloneQualifier(sumCell))
+            + " " + this.action);
+      }
+      LOG.info("After major compaction for qualifier="
+          + Bytes.toString(CellUtil.cloneQualifier(sumCell))
+          + " with currentColumnCells.size="
+          + currentColumnCells.size()
+          + " returning finalCells.size=" + finalCells.size()
+          + " with sum=" + sum.longValue()
+          + " with cell timestamp " + sumCell.getTimestamp());
+    } else {
+      String qualifier = "";
+      LOG.info("After major compaction for qualifier=" + qualifier
+          + " with currentColumnCells.size="
+          + currentColumnCells.size()
+          + " returning finalCells.size=" + finalCells.size()
+          + " with zero sum="
+          + sum.longValue());
+    }
+    return finalCells;
+  }
+
+  /**
+   * Determines which cell is to be returned based on the values in each cell
+   * and the comparison operation MIN or MAX.
+   *
+   * @param previouslyChosenCell
+   * @param currentCell
+   * @param currentAggOp
+   * @return the cell which is the min (or max) cell
+   * @throws IOException
+   */
+  private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell,
+      AggregationOperation currentAggOp, NumericValueConverter converter)
+      throws IOException {
+    if (previouslyChosenCell == null) {
+      return currentCell;
+    }
+    try {
+      Number previouslyChosenCellValue = (Number)converter.decodeValue(
+          CellUtil.cloneValue(previouslyChosenCell));
+      Number currentCellValue = (Number) converter.decodeValue(CellUtil
+          .cloneValue(currentCell));
+      switch (currentAggOp) {
+      case GLOBAL_MIN:
+        if (converter.compare(
+            currentCellValue, previouslyChosenCellValue) < 0) {
+          // new value is minimum, hence return this cell
+          return currentCell;
+        } else {
+          // previously chosen value is miniumum, hence return previous min 
cell
+          return previouslyChosenCell;
+        }
+      case GLOBAL_MAX:
+        if (converter.compare(
+            currentCellValue, previouslyChosenCellValue) > 0) {
+          // new value is max, hence return this cell
+          return currentCell;
+        } else {
+          // previously chosen value is max, hence return previous max cell
+          return previouslyChosenCell;
+        }
+      default:
+        return currentCell;
+      }
+    } catch (IllegalArgumentException iae) {
+      LOG.error("caught iae during conversion to long ", iae);
+      return currentCell;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (flowRunScanner != null) {
+      flowRunScanner.close();
+    } else {
+      LOG.warn("scanner close called but scanner is null");
+    }
+  }
+
+  /**
+   * Called to signal the start of the next() call by the scanner.
+   */
+  public void startNext() {
+    currentRow = null;
+  }
+
+  /**
+   * Returns whether or not the underlying scanner has more rows.
+   */
+  public boolean hasMore() {
+    return currentIndex < availableCells.size() ? true : hasMore;
+  }
+
+  /**
+   * Returns the next available cell for the current row and advances the
+   * pointer to the next cell. This method can be called multiple times in a 
row
+   * to advance through all the available cells.
+   *
+   * @param scannerContext
+   *          context information for the batch of cells under consideration
+   * @return the next available cell or null if no more cells are available for
+   *         the current row
+   * @throws IOException
+   */
+  public Cell nextCell(ScannerContext scannerContext) throws IOException {
+    Cell cell = peekAtNextCell(scannerContext);
+    if (cell != null) {
+      currentIndex++;
+    }
+    return cell;
+  }
+
+  /**
+   * Returns the next available cell for the current row, without advancing the
+   * pointer. Calling this method multiple times in a row will continue to
+   * return the same cell.
+   *
+   * @param scannerContext
+   *          context information for the batch of cells under consideration
+   * @return the next available cell or null if no more cells are available for
+   *         the current row
+   * @throws IOException if any problem is encountered while grabbing the next
+   *     cell.
+   */
+  public Cell peekAtNextCell(ScannerContext scannerContext) throws IOException 
{
+    if (currentIndex >= availableCells.size()) {
+      // done with current batch
+      availableCells.clear();
+      currentIndex = 0;
+      hasMore = flowRunScanner.next(availableCells, scannerContext);
+    }
+    Cell cell = null;
+    if (currentIndex < availableCells.size()) {
+      cell = availableCells.get(currentIndex);
+      if (currentRow == null) {
+        currentRow = CellUtil.cloneRow(cell);
+      } else if (!CellUtil.matchingRow(cell, currentRow)) {
+        // moved on to the next row
+        // don't use the current cell
+        // also signal no more cells for this row
+        return null;
+      }
+    }
+    return cell;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize()
+   */
+  @Override
+  public long getMaxResultSize() {
+    if (regionScanner == null) {
+      throw new IllegalStateException(
+          "RegionScanner.isFilterDone() called when the flow "
+              + "scanner's scanner is not a RegionScanner");
+    }
+    return regionScanner.getMaxResultSize();
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint()
+   */
+  @Override
+  public long getMvccReadPoint() {
+    if (regionScanner == null) {
+      throw new IllegalStateException(
+          "RegionScanner.isFilterDone() called when the flow "
+              + "scanner's internal scanner is not a RegionScanner");
+    }
+    return regionScanner.getMvccReadPoint();
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone()
+   */
+  @Override
+  public boolean isFilterDone() throws IOException {
+    if (regionScanner == null) {
+      throw new IllegalStateException(
+          "RegionScanner.isFilterDone() called when the flow "
+              + "scanner's internal scanner is not a RegionScanner");
+    }
+    return regionScanner.isFilterDone();
+
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[])
+   */
+  @Override
+  public boolean reseek(byte[] bytes) throws IOException {
+    if (regionScanner == null) {
+      throw new IllegalStateException(
+          "RegionScanner.reseek() called when the flow "
+              + "scanner's internal scanner is not a RegionScanner");
+    }
+    return regionScanner.reseek(bytes);
+  }
+
+  @Override
+  public int getBatch() {
+    return batchSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
new file mode 100644
index 0000000..73c666f
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+
+/**
+ * Identifies the scanner operation on the {@link FlowRunTable}.
+ */
+public enum FlowScannerOperation {
+
+  /**
+   * If the scanner is opened for reading
+   * during preGet or preScan.
+   */
+  READ,
+
+  /**
+   * If the scanner is opened during preFlush.
+   */
+  FLUSH,
+
+  /**
+   * If the scanner is opened during minor Compaction.
+   */
+  MINOR_COMPACTION,
+
+  /**
+   * If the scanner is opened during major Compaction.
+   */
+  MAJOR_COMPACTION
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
new file mode 100644
index 0000000..04963f3
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage.flow
+ * contains classes related to implementation for flow related tables, viz. 
flow
+ * run table and flow activity table.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java
new file mode 100644
index 0000000..e78db2a
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage contains
+ * classes which define and implement reading and writing to backend storage.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml
index 6369864..7e5a803 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml
@@ -18,197 +18,24 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0";
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
-                             http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+                      http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <parent>
     <artifactId>hadoop-yarn-server</artifactId>
     <groupId>org.apache.hadoop</groupId>
     <version>3.2.0-SNAPSHOT</version>
   </parent>
+
   <modelVersion>4.0.0</modelVersion>
   <artifactId>hadoop-yarn-server-timelineservice-hbase</artifactId>
+  <version>3.2.0-SNAPSHOT</version>
   <name>Apache Hadoop YARN TimelineService HBase Backend</name>
+  <packaging>pom</packaging>
 
-  <properties>
-    <!-- Needed for generating FindBugs warnings using parent pom -->
-    <yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>commons-logging</groupId>
-      <artifactId>commons-logging</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>commons-lang</groupId>
-      <artifactId>commons-lang</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>commons-cli</groupId>
-      <artifactId>commons-cli</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-annotations</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-api</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-common</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-server-timelineservice</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase-common</artifactId>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-mapreduce-client-core</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jetty-util</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase-client</artifactId>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-mapreduce-client-core</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase-server</artifactId>
-      <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-hdfs</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-hdfs-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-mapreduce-client-core</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jetty</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jetty-util</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jetty-sslengine</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
+  <modules>
+    <module>hadoop-yarn-server-timelineservice-hbase-client</module>
+    <module>hadoop-yarn-server-timelineservice-hbase-common</module>
+    <module>hadoop-yarn-server-timelineservice-hbase-server</module>
+  </modules>
 
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
 
-  <build>
-    <plugins>
-      <plugin>
-        <artifactId>maven-jar-plugin</artifactId>
-        <executions>
-          <execution>
-            <goals>
-              <goal>test-jar</goal>
-            </goals>
-            <phase>test-compile</phase>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-javadoc-plugin</artifactId>
-        <configuration>
-          <additionnalDependencies>
-            <additionnalDependency>
-              <groupId>junit</groupId>
-              <artifactId>junit</artifactId>
-              <version>4.11</version>
-            </additionnalDependency>
-          </additionnalDependencies>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-dependency-plugin</artifactId>
-        <executions>
-          <execution>
-            <phase>package</phase>
-            <goals>
-              <goal>copy-dependencies</goal>
-            </goals>
-            <configuration>
-              <includeScope>runtime</includeScope>
-              
<excludeGroupIds>org.slf4j,org.apache.hadoop,com.github.stephenc.findbugs</excludeGroupIds>
-              <outputDirectory>${project.build.directory}/lib</outputDirectory>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-</project>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
deleted file mode 100644
index 8b46d32..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
-import org.apache.hadoop.hbase.filter.FamilyFilter;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-import org.apache.hadoop.hbase.filter.QualifierFilter;
-import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Set of utility methods used by timeline filter classes.
- */
-public final class TimelineFilterUtils {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TimelineFilterUtils.class);
-
-  private TimelineFilterUtils() {
-  }
-
-  /**
-   * Returns the equivalent HBase filter list's {@link Operator}.
-   *
-   * @param op timeline filter list operator.
-   * @return HBase filter list's Operator.
-   */
-  private static Operator getHBaseOperator(TimelineFilterList.Operator op) {
-    switch (op) {
-    case AND:
-      return Operator.MUST_PASS_ALL;
-    case OR:
-      return Operator.MUST_PASS_ONE;
-    default:
-      throw new IllegalArgumentException("Invalid operator");
-    }
-  }
-
-  /**
-   * Returns the equivalent HBase compare filter's {@link CompareOp}.
-   *
-   * @param op timeline compare op.
-   * @return HBase compare filter's CompareOp.
-   */
-  private static CompareOp getHBaseCompareOp(
-      TimelineCompareOp op) {
-    switch (op) {
-    case LESS_THAN:
-      return CompareOp.LESS;
-    case LESS_OR_EQUAL:
-      return CompareOp.LESS_OR_EQUAL;
-    case EQUAL:
-      return CompareOp.EQUAL;
-    case NOT_EQUAL:
-      return CompareOp.NOT_EQUAL;
-    case GREATER_OR_EQUAL:
-      return CompareOp.GREATER_OR_EQUAL;
-    case GREATER_THAN:
-      return CompareOp.GREATER;
-    default:
-      throw new IllegalArgumentException("Invalid compare operator");
-    }
-  }
-
-  /**
-   * Converts a {@link TimelinePrefixFilter} to an equivalent HBase
-   * {@link QualifierFilter}.
-   * @param colPrefix
-   * @param filter
-   * @return a {@link QualifierFilter} object
-   */
-  private static <T> Filter createHBaseColQualPrefixFilter(
-      ColumnPrefix<T> colPrefix, TimelinePrefixFilter filter) {
-    return new QualifierFilter(getHBaseCompareOp(filter.getCompareOp()),
-        new BinaryPrefixComparator(
-            colPrefix.getColumnPrefixBytes(filter.getPrefix())));
-  }
-
-  /**
-   * Create a HBase {@link QualifierFilter} for the passed column prefix and
-   * compare op.
-   *
-   * @param <T> Describes the type of column prefix.
-   * @param compareOp compare op.
-   * @param columnPrefix column prefix.
-   * @return a column qualifier filter.
-   */
-  public static <T> Filter createHBaseQualifierFilter(CompareOp compareOp,
-      ColumnPrefix<T> columnPrefix) {
-    return new QualifierFilter(compareOp,
-        new BinaryPrefixComparator(
-            columnPrefix.getColumnPrefixBytes("")));
-  }
-
-  /**
-   * Create filters for confs or metrics to retrieve. This list includes a
-   * configs/metrics family filter and relevant filters for confs/metrics to
-   * retrieve, if present.
-   *
-   * @param <T> Describes the type of column prefix.
-   * @param confsOrMetricToRetrieve configs/metrics to retrieve.
-   * @param columnFamily config or metric column family.
-   * @param columnPrefix config or metric column prefix.
-   * @return a filter list.
-   * @throws IOException if any problem occurs while creating the filters.
-   */
-  public static <T> Filter createFilterForConfsOrMetricsToRetrieve(
-      TimelineFilterList confsOrMetricToRetrieve, ColumnFamily<T> columnFamily,
-      ColumnPrefix<T> columnPrefix) throws IOException {
-    Filter familyFilter = new FamilyFilter(CompareOp.EQUAL,
-        new BinaryComparator(columnFamily.getBytes()));
-    if (confsOrMetricToRetrieve != null &&
-        !confsOrMetricToRetrieve.getFilterList().isEmpty()) {
-      // If confsOrMetricsToRetrive are specified, create a filter list based
-      // on it and family filter.
-      FilterList filter = new FilterList(familyFilter);
-      filter.addFilter(
-          createHBaseFilterList(columnPrefix, confsOrMetricToRetrieve));
-      return filter;
-    } else {
-      // Only the family filter needs to be added.
-      return familyFilter;
-    }
-  }
-
-  /**
-   * Create 2 HBase {@link SingleColumnValueFilter} filters for the specified
-   * value range represented by start and end value and wraps them inside a
-   * filter list. Start and end value should not be null.
-   *
-   * @param <T> Describes the type of column prefix.
-   * @param column Column for which single column value filter is to be 
created.
-   * @param startValue Start value.
-   * @param endValue End value.
-   * @return 2 single column value filters wrapped in a filter list.
-   * @throws IOException if any problem is encountered while encoding value.
-   */
-  public static <T> FilterList createSingleColValueFiltersByRange(
-      Column<T> column, Object startValue, Object endValue) throws IOException 
{
-    FilterList list = new FilterList();
-    Filter singleColValFilterStart = createHBaseSingleColValueFilter(
-        column.getColumnFamilyBytes(), column.getColumnQualifierBytes(),
-        column.getValueConverter().encodeValue(startValue),
-        CompareOp.GREATER_OR_EQUAL, true);
-    list.addFilter(singleColValFilterStart);
-
-    Filter singleColValFilterEnd = createHBaseSingleColValueFilter(
-        column.getColumnFamilyBytes(), column.getColumnQualifierBytes(),
-        column.getValueConverter().encodeValue(endValue),
-        CompareOp.LESS_OR_EQUAL, true);
-    list.addFilter(singleColValFilterEnd);
-    return list;
-  }
-
-  /**
-   * Creates a HBase {@link SingleColumnValueFilter} with specified column.
-   * @param <T> Describes the type of column prefix.
-   * @param column Column which value to be filtered.
-   * @param value Value to be filtered.
-   * @param op Compare operator
-   * @return a SingleColumnValue Filter
-   * @throws IOException if any exception.
-   */
-  public static <T> Filter createHBaseSingleColValueFilter(Column<T> column,
-      Object value, CompareOp op) throws IOException {
-    Filter singleColValFilter = createHBaseSingleColValueFilter(
-        column.getColumnFamilyBytes(), column.getColumnQualifierBytes(),
-        column.getValueConverter().encodeValue(value), op, true);
-    return singleColValFilter;
-  }
-
-  /**
-   * Creates a HBase {@link SingleColumnValueFilter}.
-   *
-   * @param columnFamily Column Family represented as bytes.
-   * @param columnQualifier Column Qualifier represented as bytes.
-   * @param value Value.
-   * @param compareOp Compare operator.
-   * @param filterIfMissing This flag decides if we should filter the row if 
the
-   *     specified column is missing. This is based on the filter's 
keyMustExist
-   *     field.
-   * @return a {@link SingleColumnValueFilter} object
-   * @throws IOException
-   */
-  private static SingleColumnValueFilter createHBaseSingleColValueFilter(
-      byte[] columnFamily, byte[] columnQualifier, byte[] value,
-      CompareOp compareOp, boolean filterIfMissing) throws IOException {
-    SingleColumnValueFilter singleColValFilter =
-        new SingleColumnValueFilter(columnFamily, columnQualifier, compareOp,
-        new BinaryComparator(value));
-    singleColValFilter.setLatestVersionOnly(true);
-    singleColValFilter.setFilterIfMissing(filterIfMissing);
-    return singleColValFilter;
-  }
-
-  /**
-   * Fetch columns from filter list containing exists and multivalue equality
-   * filters. This is done to fetch only required columns from back-end and
-   * then match event filters or relationships in reader.
-   *
-   * @param filterList filter list.
-   * @return set of columns.
-   */
-  public static Set<String> fetchColumnsFromFilterList(
-      TimelineFilterList filterList) {
-    Set<String> strSet = new HashSet<String>();
-    for (TimelineFilter filter : filterList.getFilterList()) {
-      switch(filter.getFilterType()) {
-      case LIST:
-        strSet.addAll(fetchColumnsFromFilterList((TimelineFilterList)filter));
-        break;
-      case KEY_VALUES:
-        strSet.add(((TimelineKeyValuesFilter)filter).getKey());
-        break;
-      case EXISTS:
-        strSet.add(((TimelineExistsFilter)filter).getValue());
-        break;
-      default:
-        LOG.info("Unexpected filter type " + filter.getFilterType());
-        break;
-      }
-    }
-    return strSet;
-  }
-
-  /**
-   * Creates equivalent HBase {@link FilterList} from {@link 
TimelineFilterList}
-   * while converting different timeline filters(of type {@link 
TimelineFilter})
-   * into their equivalent HBase filters.
-   *
-   * @param <T> Describes the type of column prefix.
-   * @param colPrefix column prefix which will be used for conversion.
-   * @param filterList timeline filter list which has to be converted.
-   * @return A {@link FilterList} object.
-   * @throws IOException if any problem occurs while creating the filter list.
-   */
-  public static <T> FilterList createHBaseFilterList(ColumnPrefix<T> colPrefix,
-      TimelineFilterList filterList) throws IOException {
-    FilterList list =
-        new FilterList(getHBaseOperator(filterList.getOperator()));
-    for (TimelineFilter filter : filterList.getFilterList()) {
-      switch(filter.getFilterType()) {
-      case LIST:
-        list.addFilter(createHBaseFilterList(colPrefix,
-            (TimelineFilterList)filter));
-        break;
-      case PREFIX:
-        list.addFilter(createHBaseColQualPrefixFilter(colPrefix,
-            (TimelinePrefixFilter)filter));
-        break;
-      case COMPARE:
-        TimelineCompareFilter compareFilter = (TimelineCompareFilter)filter;
-        list.addFilter(
-            createHBaseSingleColValueFilter(
-                colPrefix.getColumnFamilyBytes(),
-                colPrefix.getColumnPrefixBytes(compareFilter.getKey()),
-                colPrefix.getValueConverter().
-                    encodeValue(compareFilter.getValue()),
-                getHBaseCompareOp(compareFilter.getCompareOp()),
-                compareFilter.getKeyMustExist()));
-        break;
-      case KEY_VALUE:
-        TimelineKeyValueFilter kvFilter = (TimelineKeyValueFilter)filter;
-        list.addFilter(
-            createHBaseSingleColValueFilter(
-                colPrefix.getColumnFamilyBytes(),
-                colPrefix.getColumnPrefixBytes(kvFilter.getKey()),
-                colPrefix.getValueConverter().encodeValue(kvFilter.getValue()),
-                getHBaseCompareOp(kvFilter.getCompareOp()),
-                kvFilter.getKeyMustExist()));
-        break;
-      default:
-        LOG.info("Unexpected filter type " + filter.getFilterType());
-        break;
-      }
-    }
-    return list;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
deleted file mode 100644
index f7c0705..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Package org.apache.hadoop.server.timelineservice.reader.filter stores
- * timeline filter implementations.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
deleted file mode 100644
index 1ebfab2..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.reader.EntityTypeReader;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * HBase based implementation for {@link TimelineReader}.
- */
-public class HBaseTimelineReaderImpl
-    extends AbstractService implements TimelineReader {
-
-  private static final Logger LOG = LoggerFactory
-      .getLogger(HBaseTimelineReaderImpl.class);
-
-  private Configuration hbaseConf = null;
-  private Connection conn;
-
-  public HBaseTimelineReaderImpl() {
-    super(HBaseTimelineReaderImpl.class.getName());
-  }
-
-  @Override
-  public void serviceInit(Configuration conf) throws Exception {
-    super.serviceInit(conf);
-    hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
-    conn = ConnectionFactory.createConnection(hbaseConf);
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    if (conn != null) {
-      LOG.info("closing the hbase Connection");
-      conn.close();
-    }
-    super.serviceStop();
-  }
-
-  @Override
-  public TimelineEntity getEntity(TimelineReaderContext context,
-      TimelineDataToRetrieve dataToRetrieve) throws IOException {
-    TimelineEntityReader reader =
-        TimelineEntityReaderFactory.createSingleEntityReader(context,
-            dataToRetrieve);
-    return reader.readEntity(hbaseConf, conn);
-  }
-
-  @Override
-  public Set<TimelineEntity> getEntities(TimelineReaderContext context,
-      TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
-      throws IOException {
-    TimelineEntityReader reader =
-        TimelineEntityReaderFactory.createMultipleEntitiesReader(context,
-            filters, dataToRetrieve);
-    return reader.readEntities(hbaseConf, conn);
-  }
-
-  @Override
-  public Set<String> getEntityTypes(TimelineReaderContext context)
-      throws IOException {
-    EntityTypeReader reader = new EntityTypeReader(context);
-    return reader.readEntityTypes(hbaseConf, conn);
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to