http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
new file mode 100644
index 0000000..0e3c8ee
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -0,0 +1,728 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Invoked via the coprocessor when a Get or a Scan is issued for flow run
+ * table. Looks through the list of cells per row, checks their tags and does
+ * operation on those cells as per the cell tags. Transforms reads of the 
stored
+ * metrics into calculated sums for each column Also, finds the min and max for
+ * start and end times in a flow run.
+ */
+class FlowScanner implements RegionScanner, Closeable {
+
+  private static final Log LOG = LogFactory.getLog(FlowScanner.class);
+
+  /**
+   * use a special application id to represent the flow id this is needed since
+   * TimestampGenerator parses the app id to generate a cell timestamp.
+   */
+  private static final String FLOW_APP_ID = "application_00000000000_0000";
+
+  private final Region region;
+  private final InternalScanner flowRunScanner;
+  private final int batchSize;
+  private final long appFinalValueRetentionThreshold;
+  private RegionScanner regionScanner;
+  private boolean hasMore;
+  private byte[] currentRow;
+  private List<Cell> availableCells = new ArrayList<>();
+  private int currentIndex;
+  private FlowScannerOperation action = FlowScannerOperation.READ;
+
+  FlowScanner(RegionCoprocessorEnvironment env, InternalScanner 
internalScanner,
+      FlowScannerOperation action) {
+    this(env, null, internalScanner, action);
+  }
+
+  FlowScanner(RegionCoprocessorEnvironment env, Scan incomingScan,
+      InternalScanner internalScanner, FlowScannerOperation action) {
+    this.batchSize = incomingScan == null ? -1 : incomingScan.getBatch();
+    // TODO initialize other scan attributes like Scan#maxResultSize
+    this.flowRunScanner = internalScanner;
+    if (internalScanner instanceof RegionScanner) {
+      this.regionScanner = (RegionScanner) internalScanner;
+    }
+    this.action = action;
+    if (env == null) {
+      this.appFinalValueRetentionThreshold =
+          YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD;
+      this.region = null;
+    } else {
+      this.region = env.getRegion();
+      Configuration hbaseConf = env.getConfiguration();
+      this.appFinalValueRetentionThreshold = hbaseConf.getLong(
+          YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD,
+          YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(" batch size=" + batchSize);
+    }
+  }
+
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo()
+   */
+  @Override
+  public HRegionInfo getRegionInfo() {
+    return region.getRegionInfo();
+  }
+
+  @Override
+  public boolean nextRaw(List<Cell> cells) throws IOException {
+    return nextRaw(cells, ScannerContext.newBuilder().build());
+  }
+
+  @Override
+  public boolean nextRaw(List<Cell> cells, ScannerContext scannerContext)
+      throws IOException {
+    return nextInternal(cells, scannerContext);
+  }
+
+  @Override
+  public boolean next(List<Cell> cells) throws IOException {
+    return next(cells, ScannerContext.newBuilder().build());
+  }
+
+  @Override
+  public boolean next(List<Cell> cells, ScannerContext scannerContext)
+      throws IOException {
+    return nextInternal(cells, scannerContext);
+  }
+
+  /**
+   * Get value converter associated with a column or a column prefix. If 
nothing
+   * matches, generic converter is returned.
+   * @param colQualifierBytes
+   * @return value converter implementation.
+   */
+  private static ValueConverter getValueConverter(byte[] colQualifierBytes) {
+    // Iterate over all the column prefixes for flow run table and get the
+    // appropriate converter for the column qualifier passed if prefix matches.
+    for (FlowRunColumnPrefix colPrefix : FlowRunColumnPrefix.values()) {
+      byte[] colPrefixBytes = colPrefix.getColumnPrefixBytes("");
+      if (Bytes.compareTo(colPrefixBytes, 0, colPrefixBytes.length,
+          colQualifierBytes, 0, colPrefixBytes.length) == 0) {
+        return colPrefix.getValueConverter();
+      }
+    }
+    // Iterate over all the columns for flow run table and get the
+    // appropriate converter for the column qualifier passed if match occurs.
+    for (FlowRunColumn column : FlowRunColumn.values()) {
+      if (Bytes.compareTo(
+          column.getColumnQualifierBytes(), colQualifierBytes) == 0) {
+        return column.getValueConverter();
+      }
+    }
+    // Return generic converter if nothing matches.
+    return GenericConverter.getInstance();
+  }
+
+  /**
+   * This method loops through the cells in a given row of the
+   * {@link FlowRunTable}. It looks at the tags of each cell to figure out how
+   * to process the contents. It then calculates the sum or min or max for each
+   * column or returns the cell as is.
+   *
+   * @param cells
+   * @param scannerContext
+   * @return true if next row is available for the scanner, false otherwise
+   * @throws IOException
+   */
+  private boolean nextInternal(List<Cell> cells, ScannerContext scannerContext)
+      throws IOException {
+    Cell cell = null;
+    startNext();
+    // Loop through all the cells in this row
+    // For min/max/metrics we do need to scan the entire set of cells to get 
the
+    // right one
+    // But with flush/compaction, the number of cells being scanned will go 
down
+    // cells are grouped per column qualifier then sorted by cell timestamp
+    // (latest to oldest) per column qualifier
+    // So all cells in one qualifier come one after the other before we see the
+    // next column qualifier
+    ByteArrayComparator comp = new ByteArrayComparator();
+    byte[] previousColumnQualifier = Separator.EMPTY_BYTES;
+    AggregationOperation currentAggOp = null;
+    SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
+    Set<String> alreadySeenAggDim = new HashSet<>();
+    int addedCnt = 0;
+    long currentTimestamp = System.currentTimeMillis();
+    ValueConverter converter = null;
+    int limit = batchSize;
+
+    while (limit <= 0 || addedCnt < limit) {
+      cell = peekAtNextCell(scannerContext);
+      if (cell == null) {
+        break;
+      }
+      byte[] currentColumnQualifier = CellUtil.cloneQualifier(cell);
+      if (previousColumnQualifier == null) {
+        // first time in loop
+        previousColumnQualifier = currentColumnQualifier;
+      }
+
+      converter = getValueConverter(currentColumnQualifier);
+      if (comp.compare(previousColumnQualifier, currentColumnQualifier) != 0) {
+        addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
+            converter, currentTimestamp);
+        resetState(currentColumnCells, alreadySeenAggDim);
+        previousColumnQualifier = currentColumnQualifier;
+        currentAggOp = getCurrentAggOp(cell);
+        converter = getValueConverter(currentColumnQualifier);
+      }
+      collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim,
+          converter, scannerContext);
+      nextCell(scannerContext);
+    }
+    if ((!currentColumnCells.isEmpty()) && ((limit <= 0 || addedCnt < limit))) 
{
+      addedCnt += emitCells(cells, currentColumnCells, currentAggOp, converter,
+          currentTimestamp);
+      if (LOG.isDebugEnabled()) {
+        if (addedCnt > 0) {
+          LOG.debug("emitted cells. " + addedCnt + " for " + this.action
+              + " rowKey="
+              + FlowRunRowKey.parseRowKey(CellUtil.cloneRow(cells.get(0))));
+        } else {
+          LOG.debug("emitted no cells for " + this.action);
+        }
+      }
+    }
+    return hasMore();
+  }
+
+  private AggregationOperation getCurrentAggOp(Cell cell) {
+    List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+        cell.getTagsLength());
+    // We assume that all the operations for a particular column are the same
+    return HBaseTimelineStorageUtils.getAggregationOperationFromTagsList(tags);
+  }
+
+  /**
+   * resets the parameters to an initialized state for next loop iteration.
+   *
+   * @param cell
+   * @param currentAggOp
+   * @param currentColumnCells
+   * @param alreadySeenAggDim
+   * @param collectedButNotEmitted
+   */
+  private void resetState(SortedSet<Cell> currentColumnCells,
+      Set<String> alreadySeenAggDim) {
+    currentColumnCells.clear();
+    alreadySeenAggDim.clear();
+  }
+
+  private void collectCells(SortedSet<Cell> currentColumnCells,
+      AggregationOperation currentAggOp, Cell cell,
+      Set<String> alreadySeenAggDim, ValueConverter converter,
+      ScannerContext scannerContext) throws IOException {
+
+    if (currentAggOp == null) {
+      // not a min/max/metric cell, so just return it as is
+      currentColumnCells.add(cell);
+      return;
+    }
+
+    switch (currentAggOp) {
+    case GLOBAL_MIN:
+      if (currentColumnCells.size() == 0) {
+        currentColumnCells.add(cell);
+      } else {
+        Cell currentMinCell = currentColumnCells.first();
+        Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp,
+            (NumericValueConverter) converter);
+        if (!currentMinCell.equals(newMinCell)) {
+          currentColumnCells.remove(currentMinCell);
+          currentColumnCells.add(newMinCell);
+        }
+      }
+      break;
+    case GLOBAL_MAX:
+      if (currentColumnCells.size() == 0) {
+        currentColumnCells.add(cell);
+      } else {
+        Cell currentMaxCell = currentColumnCells.first();
+        Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp,
+            (NumericValueConverter) converter);
+        if (!currentMaxCell.equals(newMaxCell)) {
+          currentColumnCells.remove(currentMaxCell);
+          currentColumnCells.add(newMaxCell);
+        }
+      }
+      break;
+    case SUM:
+    case SUM_FINAL:
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("In collect cells "
+            + " FlowSannerOperation="
+            + this.action
+            + " currentAggOp="
+            + currentAggOp
+            + " cell qualifier="
+            + Bytes.toString(CellUtil.cloneQualifier(cell))
+            + " cell value= "
+            + converter.decodeValue(CellUtil.cloneValue(cell))
+            + " timestamp=" + cell.getTimestamp());
+      }
+
+      // only if this app has not been seen yet, add to current column cells
+      List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+          cell.getTagsLength());
+      String aggDim = HBaseTimelineStorageUtils
+          .getAggregationCompactionDimension(tags);
+      if (!alreadySeenAggDim.contains(aggDim)) {
+        // if this agg dimension has already been seen,
+        // since they show up in sorted order
+        // we drop the rest which are older
+        // in other words, this cell is older than previously seen cells
+        // for that agg dim
+        // but when this agg dim is not seen,
+        // consider this cell in our working set
+        currentColumnCells.add(cell);
+        alreadySeenAggDim.add(aggDim);
+      }
+      break;
+    default:
+      break;
+    } // end of switch case
+  }
+
+  /*
+   * Processes the cells in input param currentColumnCells and populates
+   * List<Cell> cells as the output based on the input AggregationOperation
+   * parameter.
+   */
+  private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
+      AggregationOperation currentAggOp, ValueConverter converter,
+      long currentTimestamp) throws IOException {
+    if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
+      return 0;
+    }
+    if (currentAggOp == null) {
+      cells.addAll(currentColumnCells);
+      return currentColumnCells.size();
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("In emitCells " + this.action + " currentColumnCells size= "
+          + currentColumnCells.size() + " currentAggOp" + currentAggOp);
+    }
+
+    switch (currentAggOp) {
+    case GLOBAL_MIN:
+    case GLOBAL_MAX:
+      cells.addAll(currentColumnCells);
+      return currentColumnCells.size();
+    case SUM:
+    case SUM_FINAL:
+      switch (action) {
+      case FLUSH:
+      case MINOR_COMPACTION:
+        cells.addAll(currentColumnCells);
+        return currentColumnCells.size();
+      case READ:
+        Cell sumCell = processSummation(currentColumnCells,
+            (NumericValueConverter) converter);
+        cells.add(sumCell);
+        return 1;
+      case MAJOR_COMPACTION:
+        List<Cell> finalCells = processSummationMajorCompaction(
+            currentColumnCells, (NumericValueConverter) converter,
+            currentTimestamp);
+        cells.addAll(finalCells);
+        return finalCells.size();
+      default:
+        cells.addAll(currentColumnCells);
+        return currentColumnCells.size();
+      }
+    default:
+      cells.addAll(currentColumnCells);
+      return currentColumnCells.size();
+    }
+  }
+
+  /*
+   * Returns a cell whose value is the sum of all cell values in the input set.
+   * The new cell created has the timestamp of the most recent metric cell. The
+   * sum of a metric for a flow run is the summation at the point of the last
+   * metric update in that flow till that time.
+   */
+  private Cell processSummation(SortedSet<Cell> currentColumnCells,
+      NumericValueConverter converter) throws IOException {
+    Number sum = 0;
+    Number currentValue = 0;
+    long ts = 0L;
+    long mostCurrentTimestamp = 0L;
+    Cell mostRecentCell = null;
+    for (Cell cell : currentColumnCells) {
+      currentValue = (Number) converter.decodeValue(CellUtil.cloneValue(cell));
+      ts = cell.getTimestamp();
+      if (mostCurrentTimestamp < ts) {
+        mostCurrentTimestamp = ts;
+        mostRecentCell = cell;
+      }
+      sum = converter.add(sum, currentValue);
+    }
+    byte[] sumBytes = converter.encodeValue(sum);
+    Cell sumCell =
+        HBaseTimelineStorageUtils.createNewCell(mostRecentCell, sumBytes);
+    return sumCell;
+  }
+
+
+  /**
+   * Returns a list of cells that contains
+   *
+   * A) the latest cells for applications that haven't finished yet
+   * B) summation
+   * for the flow, based on applications that have completed and are older than
+   * a certain time
+   *
+   * The new cell created has the timestamp of the most recent metric cell. The
+   * sum of a metric for a flow run is the summation at the point of the last
+   * metric update in that flow till that time.
+   */
+  @VisibleForTesting
+  List<Cell> processSummationMajorCompaction(
+      SortedSet<Cell> currentColumnCells, NumericValueConverter converter,
+      long currentTimestamp)
+      throws IOException {
+    Number sum = 0;
+    Number currentValue = 0;
+    long ts = 0L;
+    boolean summationDone = false;
+    List<Cell> finalCells = new ArrayList<Cell>();
+    if (currentColumnCells == null) {
+      return finalCells;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("In processSummationMajorCompaction,"
+          + " will drop cells older than " + currentTimestamp
+          + " CurrentColumnCells size=" + currentColumnCells.size());
+    }
+
+    for (Cell cell : currentColumnCells) {
+      AggregationOperation cellAggOp = getCurrentAggOp(cell);
+      // if this is the existing flow sum cell
+      List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+          cell.getTagsLength());
+      String appId = HBaseTimelineStorageUtils
+          .getAggregationCompactionDimension(tags);
+      if (appId == FLOW_APP_ID) {
+        sum = converter.add(sum, currentValue);
+        summationDone = true;
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("reading flow app id sum=" + sum);
+        }
+      } else {
+        currentValue = (Number) converter.decodeValue(CellUtil
+            .cloneValue(cell));
+        // read the timestamp truncated by the generator
+        ts =  TimestampGenerator.getTruncatedTimestamp(cell.getTimestamp());
+        if ((cellAggOp == AggregationOperation.SUM_FINAL)
+            && ((ts + this.appFinalValueRetentionThreshold)
+                < currentTimestamp)) {
+          sum = converter.add(sum, currentValue);
+          summationDone = true;
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("MAJOR COMPACTION loop sum= " + sum
+                + " discarding now: " + " qualifier="
+                + Bytes.toString(CellUtil.cloneQualifier(cell)) + " value="
+                + converter.decodeValue(CellUtil.cloneValue(cell))
+                + " timestamp=" + cell.getTimestamp() + " " + this.action);
+          }
+        } else {
+          // not a final value but it's the latest cell for this app
+          // so include this cell in the list of cells to write back
+          finalCells.add(cell);
+        }
+      }
+    }
+    if (summationDone) {
+      Cell anyCell = currentColumnCells.first();
+      List<Tag> tags = new ArrayList<Tag>();
+      Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+          Bytes.toBytes(FLOW_APP_ID));
+      tags.add(t);
+      t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(),
+          Bytes.toBytes(FLOW_APP_ID));
+      tags.add(t);
+      byte[] tagByteArray = Tag.fromList(tags);
+      Cell sumCell = HBaseTimelineStorageUtils.createNewCell(
+          CellUtil.cloneRow(anyCell),
+          CellUtil.cloneFamily(anyCell),
+          CellUtil.cloneQualifier(anyCell),
+          TimestampGenerator.getSupplementedTimestamp(
+              System.currentTimeMillis(), FLOW_APP_ID),
+              converter.encodeValue(sum), tagByteArray);
+      finalCells.add(sumCell);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("MAJOR COMPACTION final sum= " + sum + " for "
+            + Bytes.toString(CellUtil.cloneQualifier(sumCell))
+            + " " + this.action);
+      }
+      LOG.info("After major compaction for qualifier="
+          + Bytes.toString(CellUtil.cloneQualifier(sumCell))
+          + " with currentColumnCells.size="
+          + currentColumnCells.size()
+          + " returning finalCells.size=" + finalCells.size()
+          + " with sum=" + sum.longValue()
+          + " with cell timestamp " + sumCell.getTimestamp());
+    } else {
+      String qualifier = "";
+      LOG.info("After major compaction for qualifier=" + qualifier
+          + " with currentColumnCells.size="
+          + currentColumnCells.size()
+          + " returning finalCells.size=" + finalCells.size()
+          + " with zero sum="
+          + sum.longValue());
+    }
+    return finalCells;
+  }
+
+  /**
+   * Determines which cell is to be returned based on the values in each cell
+   * and the comparison operation MIN or MAX.
+   *
+   * @param previouslyChosenCell
+   * @param currentCell
+   * @param currentAggOp
+   * @return the cell which is the min (or max) cell
+   * @throws IOException
+   */
+  private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell,
+      AggregationOperation currentAggOp, NumericValueConverter converter)
+      throws IOException {
+    if (previouslyChosenCell == null) {
+      return currentCell;
+    }
+    try {
+      Number previouslyChosenCellValue = (Number)converter.decodeValue(
+          CellUtil.cloneValue(previouslyChosenCell));
+      Number currentCellValue = (Number) converter.decodeValue(CellUtil
+          .cloneValue(currentCell));
+      switch (currentAggOp) {
+      case GLOBAL_MIN:
+        if (converter.compare(
+            currentCellValue, previouslyChosenCellValue) < 0) {
+          // new value is minimum, hence return this cell
+          return currentCell;
+        } else {
+          // previously chosen value is miniumum, hence return previous min 
cell
+          return previouslyChosenCell;
+        }
+      case GLOBAL_MAX:
+        if (converter.compare(
+            currentCellValue, previouslyChosenCellValue) > 0) {
+          // new value is max, hence return this cell
+          return currentCell;
+        } else {
+          // previously chosen value is max, hence return previous max cell
+          return previouslyChosenCell;
+        }
+      default:
+        return currentCell;
+      }
+    } catch (IllegalArgumentException iae) {
+      LOG.error("caught iae during conversion to long ", iae);
+      return currentCell;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (flowRunScanner != null) {
+      flowRunScanner.close();
+    } else {
+      LOG.warn("scanner close called but scanner is null");
+    }
+  }
+
+  /**
+   * Called to signal the start of the next() call by the scanner.
+   */
+  public void startNext() {
+    currentRow = null;
+  }
+
+  /**
+   * Returns whether or not the underlying scanner has more rows.
+   */
+  public boolean hasMore() {
+    return currentIndex < availableCells.size() ? true : hasMore;
+  }
+
+  /**
+   * Returns the next available cell for the current row and advances the
+   * pointer to the next cell. This method can be called multiple times in a 
row
+   * to advance through all the available cells.
+   *
+   * @param scannerContext
+   *          context information for the batch of cells under consideration
+   * @return the next available cell or null if no more cells are available for
+   *         the current row
+   * @throws IOException
+   */
+  public Cell nextCell(ScannerContext scannerContext) throws IOException {
+    Cell cell = peekAtNextCell(scannerContext);
+    if (cell != null) {
+      currentIndex++;
+    }
+    return cell;
+  }
+
+  /**
+   * Returns the next available cell for the current row, without advancing the
+   * pointer. Calling this method multiple times in a row will continue to
+   * return the same cell.
+   *
+   * @param scannerContext
+   *          context information for the batch of cells under consideration
+   * @return the next available cell or null if no more cells are available for
+   *         the current row
+   * @throws IOException if any problem is encountered while grabbing the next
+   *     cell.
+   */
+  public Cell peekAtNextCell(ScannerContext scannerContext) throws IOException 
{
+    if (currentIndex >= availableCells.size()) {
+      // done with current batch
+      availableCells.clear();
+      currentIndex = 0;
+      hasMore = flowRunScanner.next(availableCells, scannerContext);
+    }
+    Cell cell = null;
+    if (currentIndex < availableCells.size()) {
+      cell = availableCells.get(currentIndex);
+      if (currentRow == null) {
+        currentRow = CellUtil.cloneRow(cell);
+      } else if (!CellUtil.matchingRow(cell, currentRow)) {
+        // moved on to the next row
+        // don't use the current cell
+        // also signal no more cells for this row
+        return null;
+      }
+    }
+    return cell;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize()
+   */
+  @Override
+  public long getMaxResultSize() {
+    if (regionScanner == null) {
+      throw new IllegalStateException(
+          "RegionScanner.isFilterDone() called when the flow "
+              + "scanner's scanner is not a RegionScanner");
+    }
+    return regionScanner.getMaxResultSize();
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint()
+   */
+  @Override
+  public long getMvccReadPoint() {
+    if (regionScanner == null) {
+      throw new IllegalStateException(
+          "RegionScanner.isFilterDone() called when the flow "
+              + "scanner's internal scanner is not a RegionScanner");
+    }
+    return regionScanner.getMvccReadPoint();
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone()
+   */
+  @Override
+  public boolean isFilterDone() throws IOException {
+    if (regionScanner == null) {
+      throw new IllegalStateException(
+          "RegionScanner.isFilterDone() called when the flow "
+              + "scanner's internal scanner is not a RegionScanner");
+    }
+    return regionScanner.isFilterDone();
+
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[])
+   */
+  @Override
+  public boolean reseek(byte[] bytes) throws IOException {
+    if (regionScanner == null) {
+      throw new IllegalStateException(
+          "RegionScanner.reseek() called when the flow "
+              + "scanner's internal scanner is not a RegionScanner");
+    }
+    return regionScanner.reseek(bytes);
+  }
+
+  @Override
+  public int getBatch() {
+    return batchSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
new file mode 100644
index 0000000..73c666f
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+
+/**
+ * Identifies the scanner operation on the {@link FlowRunTable}.
+ */
+public enum FlowScannerOperation {
+
+  /**
+   * If the scanner is opened for reading
+   * during preGet or preScan.
+   */
+  READ,
+
+  /**
+   * If the scanner is opened during preFlush.
+   */
+  FLUSH,
+
+  /**
+   * If the scanner is opened during minor Compaction.
+   */
+  MINOR_COMPACTION,
+
+  /**
+   * If the scanner is opened during major Compaction.
+   */
+  MAJOR_COMPACTION
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
new file mode 100644
index 0000000..04963f3
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage.flow
+ * contains classes related to implementation for flow related tables, viz. 
flow
+ * run table and flow activity table.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java
new file mode 100644
index 0000000..e78db2a
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.storage contains
+ * classes which define and implement reading and writing to backend storage.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
new file mode 100644
index 0000000..5bacf66
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+
+/**
+ * The base class for reading timeline data from the HBase storage. This class
+ * provides basic support to validate and augment reader context.
+ */
+public abstract class AbstractTimelineStorageReader {
+
+  private final TimelineReaderContext context;
+  /**
+   * Used to look up the flow context.
+   */
+  private final AppToFlowTable appToFlowTable = new AppToFlowTable();
+
+  public AbstractTimelineStorageReader(TimelineReaderContext ctxt) {
+    context = ctxt;
+  }
+
+  protected TimelineReaderContext getContext() {
+    return context;
+  }
+
+  /**
+   * Looks up flow context from AppToFlow table.
+   *
+   * @param appToFlowRowKey to identify Cluster and App Ids.
+   * @param clusterId the cluster id.
+   * @param hbaseConf HBase configuration.
+   * @param conn HBase Connection.
+   * @return flow context information.
+   * @throws IOException if any problem occurs while fetching flow information.
+   */
+  protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey,
+      String clusterId, Configuration hbaseConf, Connection conn)
+      throws IOException {
+    byte[] rowKey = appToFlowRowKey.getRowKey();
+    Get get = new Get(rowKey);
+    Result result = appToFlowTable.getResult(hbaseConf, conn, get);
+    if (result != null && !result.isEmpty()) {
+      Object flowName =
+          AppToFlowColumnPrefix.FLOW_NAME.readResult(result, clusterId);
+      Object flowRunId =
+          AppToFlowColumnPrefix.FLOW_RUN_ID.readResult(result, clusterId);
+      Object userId =
+          AppToFlowColumnPrefix.USER_ID.readResult(result, clusterId);
+      if (flowName == null || userId == null || flowRunId == null) {
+        throw new NotFoundException(
+            "Unable to find the context flow name, and flow run id, "
+            + "and user id for clusterId=" + clusterId
+            + ", appId=" + appToFlowRowKey.getAppId());
+      }
+      return new FlowContext((String)userId, (String)flowName,
+          ((Number)flowRunId).longValue());
+    } else {
+      throw new NotFoundException(
+          "Unable to find the context flow name, and flow run id, "
+          + "and user id for clusterId=" + clusterId
+          + ", appId=" + appToFlowRowKey.getAppId());
+    }
+  }
+
+  /**
+    * Sets certain parameters to defaults if the values are not provided.
+    *
+    * @param hbaseConf HBase Configuration.
+    * @param conn HBase Connection.
+    * @throws IOException if any exception is encountered while setting params.
+    */
+  protected void augmentParams(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    defaultAugmentParams(hbaseConf, conn);
+  }
+
+  /**
+   * Default behavior for all timeline readers to augment parameters.
+   *
+   * @param hbaseConf HBase Configuration.
+   * @param conn HBase Connection.
+   * @throws IOException if any exception is encountered while setting params.
+   */
+  final protected void defaultAugmentParams(Configuration hbaseConf,
+      Connection conn) throws IOException {
+    // In reality all three should be null or neither should be null
+    if (context.getFlowName() == null || context.getFlowRunId() == null
+        || context.getUserId() == null) {
+      // Get flow context information from AppToFlow table.
+      AppToFlowRowKey appToFlowRowKey =
+          new AppToFlowRowKey(context.getAppId());
+      FlowContext flowContext =
+          lookupFlowContext(appToFlowRowKey, context.getClusterId(), hbaseConf,
+          conn);
+      context.setFlowName(flowContext.flowName);
+      context.setFlowRunId(flowContext.flowRunId);
+      context.setUserId(flowContext.userId);
+    }
+  }
+
+  /**
+   * Validates the required parameters to read the entities.
+   */
+  protected abstract void validateParams();
+
+  /**
+   * Encapsulates flow context information.
+   */
+  protected static class FlowContext {
+    private final String userId;
+    private final String flowName;
+    private final Long flowRunId;
+
+    public FlowContext(String user, String flowName, Long flowRunId) {
+      this.userId = user;
+      this.flowName = flowName;
+      this.flowRunId = flowRunId;
+    }
+
+    protected String getUserId() {
+      return userId;
+    }
+
+    protected String getFlowName() {
+      return flowName;
+    }
+
+    protected Long getFlowRunId() {
+      return flowRunId;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
new file mode 100644
index 0000000..4e8286d
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
@@ -0,0 +1,502 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Timeline entity reader for application entities that are stored in the
+ * application table.
+ */
+class ApplicationEntityReader extends GenericEntityReader {
+  private static final ApplicationTable APPLICATION_TABLE =
+      new ApplicationTable();
+
+  public ApplicationEntityReader(TimelineReaderContext ctxt,
+      TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
+    super(ctxt, entityFilters, toRetrieve);
+  }
+
+  public ApplicationEntityReader(TimelineReaderContext ctxt,
+      TimelineDataToRetrieve toRetrieve) {
+    super(ctxt, toRetrieve);
+  }
+
+  /**
+   * Uses the {@link ApplicationTable}.
+   */
+  protected BaseTable<?> getTable() {
+    return APPLICATION_TABLE;
+  }
+
+  /**
+   * This method is called only for multiple entity reads.
+   */
+  @Override
+  protected FilterList constructFilterListBasedOnFilters() throws IOException {
+    // Filters here cannot be null for multiple entity reads as they are set in
+    // augmentParams if null.
+    TimelineEntityFilters filters = getFilters();
+    FilterList listBasedOnFilters = new FilterList();
+    // Create filter list based on created time range and add it to
+    // listBasedOnFilters.
+    long createdTimeBegin = filters.getCreatedTimeBegin();
+    long createdTimeEnd = filters.getCreatedTimeEnd();
+    if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
+      listBasedOnFilters.addFilter(
+          TimelineFilterUtils.createSingleColValueFiltersByRange(
+          ApplicationColumn.CREATED_TIME, createdTimeBegin, createdTimeEnd));
+    }
+    // Create filter list based on metric filters and add it to
+    // listBasedOnFilters.
+    TimelineFilterList metricFilters = filters.getMetricFilters();
+    if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
+      listBasedOnFilters.addFilter(
+          TimelineFilterUtils.createHBaseFilterList(
+              ApplicationColumnPrefix.METRIC, metricFilters));
+    }
+    // Create filter list based on config filters and add it to
+    // listBasedOnFilters.
+    TimelineFilterList configFilters = filters.getConfigFilters();
+    if (configFilters != null && !configFilters.getFilterList().isEmpty()) {
+      listBasedOnFilters.addFilter(
+          TimelineFilterUtils.createHBaseFilterList(
+              ApplicationColumnPrefix.CONFIG, configFilters));
+    }
+    // Create filter list based on info filters and add it to 
listBasedOnFilters
+    TimelineFilterList infoFilters = filters.getInfoFilters();
+    if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) {
+      listBasedOnFilters.addFilter(
+          TimelineFilterUtils.createHBaseFilterList(
+              ApplicationColumnPrefix.INFO, infoFilters));
+    }
+    return listBasedOnFilters;
+  }
+
+  /**
+   * Add {@link QualifierFilter} filters to filter list for each column of
+   * application table.
+   *
+   * @param list filter list to which qualifier filters have to be added.
+   */
+  @Override
+  protected void updateFixedColumns(FilterList list) {
+    for (ApplicationColumn column : ApplicationColumn.values()) {
+      list.addFilter(new QualifierFilter(CompareOp.EQUAL,
+          new BinaryComparator(column.getColumnQualifierBytes())));
+    }
+  }
+
+  /**
+   * Creates a filter list which indicates that only some of the column
+   * qualifiers in the info column family will be returned in result.
+   *
+   * @return filter list.
+   * @throws IOException if any problem occurs while creating filter list.
+   */
+  private FilterList createFilterListForColsOfInfoFamily()
+      throws IOException {
+    FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE);
+    // Add filters for each column in entity table.
+    updateFixedColumns(infoFamilyColsFilter);
+    EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
+    // If INFO field has to be retrieved, add a filter for fetching columns
+    // with INFO column prefix.
+    if (hasField(fieldsToRetrieve, Field.INFO)) {
+      infoFamilyColsFilter.addFilter(
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.EQUAL, ApplicationColumnPrefix.INFO));
+    }
+    TimelineFilterList relatesTo = getFilters().getRelatesTo();
+    if (hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+      // If RELATES_TO field has to be retrieved, add a filter for fetching
+      // columns with RELATES_TO column prefix.
+      infoFamilyColsFilter.addFilter(
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.EQUAL, ApplicationColumnPrefix.RELATES_TO));
+    } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) {
+      // Even if fields to retrieve does not contain RELATES_TO, we still
+      // need to have a filter to fetch some of the column qualifiers if
+      // relatesTo filters are specified. relatesTo filters will then be
+      // matched after fetching rows from HBase.
+      Set<String> relatesToCols =
+          TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo);
+      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+          ApplicationColumnPrefix.RELATES_TO, relatesToCols));
+    }
+    TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo();
+    if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
+      // If IS_RELATED_TO field has to be retrieved, add a filter for fetching
+      // columns with IS_RELATED_TO column prefix.
+      infoFamilyColsFilter.addFilter(
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.EQUAL, ApplicationColumnPrefix.IS_RELATED_TO));
+    } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) {
+      // Even if fields to retrieve does not contain IS_RELATED_TO, we still
+      // need to have a filter to fetch some of the column qualifiers if
+      // isRelatedTo filters are specified. isRelatedTo filters will then be
+      // matched after fetching rows from HBase.
+      Set<String> isRelatedToCols =
+          TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo);
+      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+          ApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols));
+    }
+    TimelineFilterList eventFilters = getFilters().getEventFilters();
+    if (hasField(fieldsToRetrieve, Field.EVENTS)) {
+      // If EVENTS field has to be retrieved, add a filter for fetching columns
+      // with EVENT column prefix.
+      infoFamilyColsFilter.addFilter(
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.EQUAL, ApplicationColumnPrefix.EVENT));
+    } else if (eventFilters != null && 
!eventFilters.getFilterList().isEmpty()){
+      // Even if fields to retrieve does not contain EVENTS, we still need to
+      // have a filter to fetch some of the column qualifiers on the basis of
+      // event filters specified. Event filters will then be matched after
+      // fetching rows from HBase.
+      Set<String> eventCols =
+          TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters);
+      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
+          ApplicationColumnPrefix.EVENT, eventCols));
+    }
+    return infoFamilyColsFilter;
+  }
+
+  /**
+   * Exclude column prefixes via filters which are not required(based on fields
+   * to retrieve) from info column family. These filters are added to filter
+   * list which contains a filter for getting info column family.
+   *
+   * @param infoColFamilyList filter list for info column family.
+   */
+  private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
+    EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
+    // Events not required.
+    if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
+      infoColFamilyList.addFilter(
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.NOT_EQUAL, ApplicationColumnPrefix.EVENT));
+    }
+    // info not required.
+    if (!hasField(fieldsToRetrieve, Field.INFO)) {
+      infoColFamilyList.addFilter(
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.NOT_EQUAL, ApplicationColumnPrefix.INFO));
+    }
+    // is related to not required.
+    if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
+      infoColFamilyList.addFilter(
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.NOT_EQUAL, ApplicationColumnPrefix.IS_RELATED_TO));
+    }
+    // relates to not required.
+    if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+      infoColFamilyList.addFilter(
+          TimelineFilterUtils.createHBaseQualifierFilter(
+              CompareOp.NOT_EQUAL, ApplicationColumnPrefix.RELATES_TO));
+    }
+  }
+
+  /**
+   * Updates filter list based on fields for confs and metrics to retrieve.
+   *
+   * @param listBasedOnFields filter list based on fields.
+   * @throws IOException if any problem occurs while updating filter list.
+   */
+  private void updateFilterForConfsAndMetricsToRetrieve(
+      FilterList listBasedOnFields) throws IOException {
+    TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
+    // Please note that if confsToRetrieve is specified, we would have added
+    // CONFS to fields to retrieve in augmentParams() even if not specified.
+    if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) {
+      // Create a filter list for configs.
+      listBasedOnFields.addFilter(TimelineFilterUtils.
+          createFilterForConfsOrMetricsToRetrieve(
+              dataToRetrieve.getConfsToRetrieve(),
+              ApplicationColumnFamily.CONFIGS, 
ApplicationColumnPrefix.CONFIG));
+    }
+
+    // Please note that if metricsToRetrieve is specified, we would have added
+    // METRICS to fields to retrieve in augmentParams() even if not specified.
+    if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) {
+      // Create a filter list for metrics.
+      listBasedOnFields.addFilter(TimelineFilterUtils.
+          createFilterForConfsOrMetricsToRetrieve(
+              dataToRetrieve.getMetricsToRetrieve(),
+              ApplicationColumnFamily.METRICS, 
ApplicationColumnPrefix.METRIC));
+    }
+  }
+
+  @Override
+  protected FilterList constructFilterListBasedOnFields() throws IOException {
+    if (!needCreateFilterListBasedOnFields()) {
+      // Fetch all the columns. No need of a filter.
+      return null;
+    }
+    FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE);
+    FilterList infoColFamilyList = new FilterList();
+    // By default fetch everything in INFO column family.
+    FamilyFilter infoColumnFamily =
+        new FamilyFilter(CompareOp.EQUAL,
+            new BinaryComparator(ApplicationColumnFamily.INFO.getBytes()));
+    infoColFamilyList.addFilter(infoColumnFamily);
+    if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) {
+      // We can fetch only some of the columns from info family.
+      infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily());
+    } else {
+      // Exclude column prefixes in info column family which are not required
+      // based on fields to retrieve.
+      excludeFieldsFromInfoColFamily(infoColFamilyList);
+    }
+    listBasedOnFields.addFilter(infoColFamilyList);
+
+    updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields);
+    return listBasedOnFields;
+  }
+
+  @Override
+  protected Result getResult(Configuration hbaseConf, Connection conn,
+      FilterList filterList) throws IOException {
+    TimelineReaderContext context = getContext();
+    ApplicationRowKey applicationRowKey =
+        new ApplicationRowKey(context.getClusterId(), context.getUserId(),
+            context.getFlowName(), context.getFlowRunId(), context.getAppId());
+    byte[] rowKey = applicationRowKey.getRowKey();
+    Get get = new Get(rowKey);
+    get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
+    if (filterList != null && !filterList.getFilters().isEmpty()) {
+      get.setFilter(filterList);
+    }
+    return getTable().getResult(hbaseConf, conn, get);
+  }
+
+  @Override
+  protected void validateParams() {
+    Preconditions.checkNotNull(getContext(), "context shouldn't be null");
+    Preconditions.checkNotNull(
+        getDataToRetrieve(), "data to retrieve shouldn't be null");
+    Preconditions.checkNotNull(getContext().getClusterId(),
+        "clusterId shouldn't be null");
+    Preconditions.checkNotNull(getContext().getEntityType(),
+        "entityType shouldn't be null");
+    if (isSingleEntityRead()) {
+      Preconditions.checkNotNull(getContext().getAppId(),
+          "appId shouldn't be null");
+    } else {
+      Preconditions.checkNotNull(getContext().getUserId(),
+          "userId shouldn't be null");
+      Preconditions.checkNotNull(getContext().getFlowName(),
+          "flowName shouldn't be null");
+    }
+  }
+
+  @Override
+  protected void augmentParams(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    if (isSingleEntityRead()) {
+      // Get flow context information from AppToFlow table.
+      defaultAugmentParams(hbaseConf, conn);
+    }
+    // Add configs/metrics to fields to retrieve if confsToRetrieve and/or
+    // metricsToRetrieve are specified.
+    getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve();
+    if (!isSingleEntityRead()) {
+      createFiltersIfNull();
+    }
+  }
+
+  @Override
+  protected ResultScanner getResults(Configuration hbaseConf,
+      Connection conn, FilterList filterList) throws IOException {
+    Scan scan = new Scan();
+    TimelineReaderContext context = getContext();
+    RowKeyPrefix<ApplicationRowKey> applicationRowKeyPrefix = null;
+
+    // Whether or not flowRunID is null doesn't matter, the
+    // ApplicationRowKeyPrefix will do the right thing.
+    // default mode, will always scans from beginning of entity type.
+    if (getFilters().getFromId() == null) {
+      applicationRowKeyPrefix = new ApplicationRowKeyPrefix(
+          context.getClusterId(), context.getUserId(), context.getFlowName(),
+          context.getFlowRunId());
+      scan.setRowPrefixFilter(applicationRowKeyPrefix.getRowKeyPrefix());
+    } else {
+      Long flowRunId = context.getFlowRunId();
+      if (flowRunId == null) {
+        AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(
+            getFilters().getFromId());
+        FlowContext flowContext = lookupFlowContext(appToFlowRowKey,
+            context.getClusterId(), hbaseConf, conn);
+        flowRunId = flowContext.getFlowRunId();
+      }
+
+      ApplicationRowKey applicationRowKey =
+          new ApplicationRowKey(context.getClusterId(), context.getUserId(),
+              context.getFlowName(), flowRunId, getFilters().getFromId());
+
+      // set start row
+      scan.setStartRow(applicationRowKey.getRowKey());
+
+      // get the bytes for stop row
+      applicationRowKeyPrefix = new ApplicationRowKeyPrefix(
+          context.getClusterId(), context.getUserId(), context.getFlowName(),
+          context.getFlowRunId());
+
+      // set stop row
+      scan.setStopRow(
+          HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix(
+              applicationRowKeyPrefix.getRowKeyPrefix()));
+    }
+
+    FilterList newList = new FilterList();
+    newList.addFilter(new PageFilter(getFilters().getLimit()));
+    if (filterList != null && !filterList.getFilters().isEmpty()) {
+      newList.addFilter(filterList);
+    }
+    scan.setFilter(newList);
+    scan.setMaxVersions(getDataToRetrieve().getMetricsLimit());
+    return getTable().getResultScanner(hbaseConf, conn, scan);
+  }
+
+  @Override
+  protected TimelineEntity parseEntity(Result result) throws IOException {
+    if (result == null || result.isEmpty()) {
+      return null;
+    }
+    TimelineEntity entity = new TimelineEntity();
+    entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
+    String entityId = ApplicationColumn.ID.readResult(result).toString();
+    entity.setId(entityId);
+
+    TimelineEntityFilters filters = getFilters();
+    // fetch created time
+    Long createdTime = (Long) 
ApplicationColumn.CREATED_TIME.readResult(result);
+    entity.setCreatedTime(createdTime);
+
+    EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
+    // fetch is related to entities and match isRelatedTo filter. If 
isRelatedTo
+    // filters do not match, entity would be dropped. We have to match filters
+    // locally as relevant HBase filters to filter out rows on the basis of
+    // isRelatedTo are not set in HBase scan.
+    boolean checkIsRelatedTo =
+        !isSingleEntityRead() && filters.getIsRelatedTo() != null &&
+        filters.getIsRelatedTo().getFilterList().size() > 0;
+    if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) {
+      readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
+          true);
+      if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity,
+          filters.getIsRelatedTo())) {
+        return null;
+      }
+      if (!hasField(fieldsToRetrieve,
+          Field.IS_RELATED_TO)) {
+        entity.getIsRelatedToEntities().clear();
+      }
+    }
+
+    // fetch relates to entities and match relatesTo filter. If relatesTo
+    // filters do not match, entity would be dropped. We have to match filters
+    // locally as relevant HBase filters to filter out rows on the basis of
+    // relatesTo are not set in HBase scan.
+    boolean checkRelatesTo =
+        !isSingleEntityRead() && filters.getRelatesTo() != null &&
+        filters.getRelatesTo().getFilterList().size() > 0;
+    if (hasField(fieldsToRetrieve, Field.RELATES_TO) ||
+        checkRelatesTo) {
+      readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
+          false);
+      if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity,
+          filters.getRelatesTo())) {
+        return null;
+      }
+      if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
+        entity.getRelatesToEntities().clear();
+      }
+    }
+
+    // fetch info if fieldsToRetrieve contains INFO or ALL.
+    if (hasField(fieldsToRetrieve, Field.INFO)) {
+      readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
+    }
+
+    // fetch configs if fieldsToRetrieve contains CONFIGS or ALL.
+    if (hasField(fieldsToRetrieve, Field.CONFIGS)) {
+      readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
+    }
+
+    // fetch events and match event filters if they exist. If event filters do
+    // not match, entity would be dropped. We have to match filters locally
+    // as relevant HBase filters to filter out rows on the basis of events
+    // are not set in HBase scan.
+    boolean checkEvents =
+        !isSingleEntityRead() && filters.getEventFilters() != null &&
+        filters.getEventFilters().getFilterList().size() > 0;
+    if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) {
+      readEvents(entity, result, ApplicationColumnPrefix.EVENT);
+      if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity,
+          filters.getEventFilters())) {
+        return null;
+      }
+      if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
+        entity.getEvents().clear();
+      }
+    }
+
+    // fetch metrics if fieldsToRetrieve contains METRICS or ALL.
+    if (hasField(fieldsToRetrieve, Field.METRICS)) {
+      readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
+    }
+    return entity;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java
new file mode 100644
index 0000000..fd85878
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Timeline entity reader for listing all available entity types given one
+ * reader context. Right now only supports listing all entity types within one
+ * YARN application.
+ */
+public final class EntityTypeReader extends AbstractTimelineStorageReader {
+
+  private static final Log LOG = LogFactory.getLog(EntityTypeReader.class);
+  private static final EntityTable ENTITY_TABLE = new EntityTable();
+
+  public EntityTypeReader(TimelineReaderContext context) {
+    super(context);
+  }
+
+  /**
+   * Reads a set of timeline entity types from the HBase storage for the given
+   * context.
+   *
+   * @param hbaseConf HBase Configuration.
+   * @param conn HBase Connection.
+   * @return a set of <cite>TimelineEntity</cite> objects, with only type field
+   *         set.
+   * @throws IOException if any exception is encountered while reading 
entities.
+   */
+  public Set<String> readEntityTypes(Configuration hbaseConf,
+      Connection conn) throws IOException {
+
+    validateParams();
+    augmentParams(hbaseConf, conn);
+
+    Set<String> types = new TreeSet<>();
+    TimelineReaderContext context = getContext();
+    EntityRowKeyPrefix prefix = new EntityRowKeyPrefix(context.getClusterId(),
+        context.getUserId(), context.getFlowName(), context.getFlowRunId(),
+        context.getAppId());
+    byte[] currRowKey = prefix.getRowKeyPrefix();
+    byte[] nextRowKey = prefix.getRowKeyPrefix();
+    nextRowKey[nextRowKey.length - 1]++;
+
+    FilterList typeFilterList = new FilterList();
+    typeFilterList.addFilter(new FirstKeyOnlyFilter());
+    typeFilterList.addFilter(new KeyOnlyFilter());
+    typeFilterList.addFilter(new PageFilter(1));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("FilterList created for scan is - " + typeFilterList);
+    }
+
+    int counter = 0;
+    while (true) {
+      try (ResultScanner results
+          = getResult(hbaseConf, conn, typeFilterList, currRowKey, nextRowKey))
+      {
+        TimelineEntity entity = parseEntityForType(results.next());
+        if (entity == null) {
+          break;
+        }
+        ++counter;
+        if (!types.add(entity.getType())) {
+          LOG.warn("Failed to add type " + entity.getType()
+              + " to the result set because there is a duplicated copy. ");
+        }
+        String currType = entity.getType();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Current row key: " + Arrays.toString(currRowKey));
+          LOG.debug("New entity type discovered: " + currType);
+        }
+        currRowKey = getNextRowKey(prefix.getRowKeyPrefix(), currType);
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Scanned " + counter + "records for "
+          + types.size() + "types");
+    }
+    return types;
+  }
+
+  @Override
+  protected void validateParams() {
+    Preconditions.checkNotNull(getContext(), "context shouldn't be null");
+    Preconditions.checkNotNull(getContext().getClusterId(),
+        "clusterId shouldn't be null");
+    Preconditions.checkNotNull(getContext().getAppId(),
+        "appId shouldn't be null");
+  }
+
+  /**
+   * Gets the possibly next row key prefix given current prefix and type.
+   *
+   * @param currRowKeyPrefix The current prefix that contains user, cluster,
+   *                         flow, run, and application id.
+   * @param entityType Current entity type.
+   * @return A new prefix for the possibly immediately next row key.
+   */
+  private static byte[] getNextRowKey(byte[] currRowKeyPrefix,
+      String entityType) {
+    if (currRowKeyPrefix == null || entityType == null) {
+      return null;
+    }
+
+    byte[] entityTypeEncoded = Separator.QUALIFIERS.join(
+        Separator.encode(entityType, Separator.SPACE, Separator.TAB,
+            Separator.QUALIFIERS),
+        Separator.EMPTY_BYTES);
+
+    byte[] currRowKey
+        = new byte[currRowKeyPrefix.length + entityTypeEncoded.length];
+    System.arraycopy(currRowKeyPrefix, 0, currRowKey, 0,
+        currRowKeyPrefix.length);
+    System.arraycopy(entityTypeEncoded, 0, currRowKey, currRowKeyPrefix.length,
+        entityTypeEncoded.length);
+
+    return HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix(
+        currRowKey);
+  }
+
+  private ResultScanner getResult(Configuration hbaseConf, Connection conn,
+      FilterList filterList, byte[] startPrefix, byte[] endPrefix)
+      throws IOException {
+    Scan scan = new Scan(startPrefix, endPrefix);
+    scan.setFilter(filterList);
+    scan.setSmall(true);
+    return ENTITY_TABLE.getResultScanner(hbaseConf, conn, scan);
+  }
+
+  private TimelineEntity parseEntityForType(Result result)
+      throws IOException {
+    if (result == null || result.isEmpty()) {
+      return null;
+    }
+    TimelineEntity entity = new TimelineEntity();
+    EntityRowKey newRowKey = EntityRowKey.parseRowKey(result.getRow());
+    entity.setType(newRowKey.getEntityType());
+    return entity;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
new file mode 100644
index 0000000..c741d0e
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Timeline entity reader for flow activity entities that are stored in the
+ * flow activity table.
+ */
+class FlowActivityEntityReader extends TimelineEntityReader {
+  private static final FlowActivityTable FLOW_ACTIVITY_TABLE =
+      new FlowActivityTable();
+
+  /**
+   * Used to convert Long key components to and from storage format.
+   */
+  private final KeyConverter<Long> longKeyConverter = new LongKeyConverter();
+
+
+  public FlowActivityEntityReader(TimelineReaderContext ctxt,
+      TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
+    super(ctxt, entityFilters, toRetrieve);
+  }
+
+  public FlowActivityEntityReader(TimelineReaderContext ctxt,
+      TimelineDataToRetrieve toRetrieve) {
+    super(ctxt, toRetrieve);
+  }
+
+  /**
+   * Uses the {@link FlowActivityTable}.
+   */
+  @Override
+  protected BaseTable<?> getTable() {
+    return FLOW_ACTIVITY_TABLE;
+  }
+
+  @Override
+  protected void validateParams() {
+    Preconditions.checkNotNull(getContext().getClusterId(),
+        "clusterId shouldn't be null");
+  }
+
+  @Override
+  protected void augmentParams(Configuration hbaseConf, Connection conn)
+      throws IOException {
+    createFiltersIfNull();
+  }
+
+  @Override
+  protected FilterList constructFilterListBasedOnFilters() throws IOException {
+    return null;
+  }
+
+  @Override
+  protected FilterList constructFilterListBasedOnFields() {
+    return null;
+  }
+
+  @Override
+  protected Result getResult(Configuration hbaseConf, Connection conn,
+      FilterList filterList) throws IOException {
+    throw new UnsupportedOperationException(
+        "we don't support a single entity query");
+  }
+
+  @Override
+  protected ResultScanner getResults(Configuration hbaseConf,
+      Connection conn, FilterList filterList) throws IOException {
+    Scan scan = new Scan();
+    String clusterId = getContext().getClusterId();
+    if (getFilters().getCreatedTimeBegin() == 0L &&
+        getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) {
+       // All records have to be chosen.
+      scan.setRowPrefixFilter(new FlowActivityRowKeyPrefix(clusterId)
+          .getRowKeyPrefix());
+    } else {
+      scan.setStartRow(new FlowActivityRowKeyPrefix(clusterId, getFilters()
+          .getCreatedTimeEnd()).getRowKeyPrefix());
+      scan.setStopRow(new FlowActivityRowKeyPrefix(clusterId, (getFilters()
+          .getCreatedTimeBegin() <= 0 ? 0
+          : (getFilters().getCreatedTimeBegin() - 1))).getRowKeyPrefix());
+    }
+    // use the page filter to limit the result to the page size
+    // the scanner may still return more than the limit; therefore we need to
+    // read the right number as we iterate
+    scan.setFilter(new PageFilter(getFilters().getLimit()));
+    return getTable().getResultScanner(hbaseConf, conn, scan);
+  }
+
+  @Override
+  protected TimelineEntity parseEntity(Result result) throws IOException {
+    FlowActivityRowKey rowKey = 
FlowActivityRowKey.parseRowKey(result.getRow());
+
+    Long time = rowKey.getDayTimestamp();
+    String user = rowKey.getUserId();
+    String flowName = rowKey.getFlowName();
+
+    FlowActivityEntity flowActivity = new FlowActivityEntity(
+        getContext().getClusterId(), time, user, flowName);
+    // set the id
+    flowActivity.setId(flowActivity.getId());
+    // get the list of run ids along with the version that are associated with
+    // this flow on this day
+    Map<Long, Object> runIdsMap =
+        FlowActivityColumnPrefix.RUN_ID.readResults(result, longKeyConverter);
+    for (Map.Entry<Long, Object> e : runIdsMap.entrySet()) {
+      Long runId = e.getKey();
+      String version = (String)e.getValue();
+      FlowRunEntity flowRun = new FlowRunEntity();
+      flowRun.setUser(user);
+      flowRun.setName(flowName);
+      flowRun.setRunId(runId);
+      flowRun.setVersion(version);
+      // set the id
+      flowRun.setId(flowRun.getId());
+      flowActivity.addFlowRun(flowRun);
+    }
+
+    return flowActivity;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to