http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
deleted file mode 100644
index 0edd6a5..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
+++ /dev/null
@@ -1,520 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Query;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.FamilyFilter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.hbase.filter.QualifierFilter;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-import org.apache.hadoop.yarn.webapp.BadRequestException;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Timeline entity reader for application entities that are stored in the
- * application table.
- */
-class ApplicationEntityReader extends GenericEntityReader {
-  private static final ApplicationTable APPLICATION_TABLE =
-      new ApplicationTable();
-
-  public ApplicationEntityReader(TimelineReaderContext ctxt,
-      TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
-    super(ctxt, entityFilters, toRetrieve);
-  }
-
-  public ApplicationEntityReader(TimelineReaderContext ctxt,
-      TimelineDataToRetrieve toRetrieve) {
-    super(ctxt, toRetrieve);
-  }
-
-  /**
-   * Uses the {@link ApplicationTable}.
-   */
-  protected BaseTable<?> getTable() {
-    return APPLICATION_TABLE;
-  }
-
-  /**
-   * This method is called only for multiple entity reads.
-   */
-  @Override
-  protected FilterList constructFilterListBasedOnFilters() throws IOException {
-    // Filters here cannot be null for multiple entity reads as they are set in
-    // augmentParams if null.
-    TimelineEntityFilters filters = getFilters();
-    FilterList listBasedOnFilters = new FilterList();
-    // Create filter list based on created time range and add it to
-    // listBasedOnFilters.
-    long createdTimeBegin = filters.getCreatedTimeBegin();
-    long createdTimeEnd = filters.getCreatedTimeEnd();
-    if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
-      listBasedOnFilters.addFilter(
-          TimelineFilterUtils.createSingleColValueFiltersByRange(
-          ApplicationColumn.CREATED_TIME, createdTimeBegin, createdTimeEnd));
-    }
-    // Create filter list based on metric filters and add it to
-    // listBasedOnFilters.
-    TimelineFilterList metricFilters = filters.getMetricFilters();
-    if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
-      listBasedOnFilters.addFilter(
-          TimelineFilterUtils.createHBaseFilterList(
-              ApplicationColumnPrefix.METRIC, metricFilters));
-    }
-    // Create filter list based on config filters and add it to
-    // listBasedOnFilters.
-    TimelineFilterList configFilters = filters.getConfigFilters();
-    if (configFilters != null && !configFilters.getFilterList().isEmpty()) {
-      listBasedOnFilters.addFilter(
-          TimelineFilterUtils.createHBaseFilterList(
-              ApplicationColumnPrefix.CONFIG, configFilters));
-    }
-    // Create filter list based on info filters and add it to 
listBasedOnFilters
-    TimelineFilterList infoFilters = filters.getInfoFilters();
-    if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) {
-      listBasedOnFilters.addFilter(
-          TimelineFilterUtils.createHBaseFilterList(
-              ApplicationColumnPrefix.INFO, infoFilters));
-    }
-    return listBasedOnFilters;
-  }
-
-  /**
-   * Add {@link QualifierFilter} filters to filter list for each column of
-   * application table.
-   *
-   * @param list filter list to which qualifier filters have to be added.
-   */
-  @Override
-  protected void updateFixedColumns(FilterList list) {
-    for (ApplicationColumn column : ApplicationColumn.values()) {
-      list.addFilter(new QualifierFilter(CompareOp.EQUAL,
-          new BinaryComparator(column.getColumnQualifierBytes())));
-    }
-  }
-
-  /**
-   * Creates a filter list which indicates that only some of the column
-   * qualifiers in the info column family will be returned in result.
-   *
-   * @return filter list.
-   * @throws IOException if any problem occurs while creating filter list.
-   */
-  private FilterList createFilterListForColsOfInfoFamily()
-      throws IOException {
-    FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE);
-    // Add filters for each column in entity table.
-    updateFixedColumns(infoFamilyColsFilter);
-    EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
-    // If INFO field has to be retrieved, add a filter for fetching columns
-    // with INFO column prefix.
-    if (hasField(fieldsToRetrieve, Field.INFO)) {
-      infoFamilyColsFilter.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.EQUAL, ApplicationColumnPrefix.INFO));
-    }
-    TimelineFilterList relatesTo = getFilters().getRelatesTo();
-    if (hasField(fieldsToRetrieve, Field.RELATES_TO)) {
-      // If RELATES_TO field has to be retrieved, add a filter for fetching
-      // columns with RELATES_TO column prefix.
-      infoFamilyColsFilter.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.EQUAL, ApplicationColumnPrefix.RELATES_TO));
-    } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) {
-      // Even if fields to retrieve does not contain RELATES_TO, we still
-      // need to have a filter to fetch some of the column qualifiers if
-      // relatesTo filters are specified. relatesTo filters will then be
-      // matched after fetching rows from HBase.
-      Set<String> relatesToCols =
-          TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo);
-      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
-          ApplicationColumnPrefix.RELATES_TO, relatesToCols));
-    }
-    TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo();
-    if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
-      // If IS_RELATED_TO field has to be retrieved, add a filter for fetching
-      // columns with IS_RELATED_TO column prefix.
-      infoFamilyColsFilter.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.EQUAL, ApplicationColumnPrefix.IS_RELATED_TO));
-    } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) {
-      // Even if fields to retrieve does not contain IS_RELATED_TO, we still
-      // need to have a filter to fetch some of the column qualifiers if
-      // isRelatedTo filters are specified. isRelatedTo filters will then be
-      // matched after fetching rows from HBase.
-      Set<String> isRelatedToCols =
-          TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo);
-      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
-          ApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols));
-    }
-    TimelineFilterList eventFilters = getFilters().getEventFilters();
-    if (hasField(fieldsToRetrieve, Field.EVENTS)) {
-      // If EVENTS field has to be retrieved, add a filter for fetching columns
-      // with EVENT column prefix.
-      infoFamilyColsFilter.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.EQUAL, ApplicationColumnPrefix.EVENT));
-    } else if (eventFilters != null && 
!eventFilters.getFilterList().isEmpty()){
-      // Even if fields to retrieve does not contain EVENTS, we still need to
-      // have a filter to fetch some of the column qualifiers on the basis of
-      // event filters specified. Event filters will then be matched after
-      // fetching rows from HBase.
-      Set<String> eventCols =
-          TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters);
-      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
-          ApplicationColumnPrefix.EVENT, eventCols));
-    }
-    return infoFamilyColsFilter;
-  }
-
-  /**
-   * Exclude column prefixes via filters which are not required(based on fields
-   * to retrieve) from info column family. These filters are added to filter
-   * list which contains a filter for getting info column family.
-   *
-   * @param infoColFamilyList filter list for info column family.
-   */
-  private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
-    EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
-    // Events not required.
-    if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
-      infoColFamilyList.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.NOT_EQUAL, ApplicationColumnPrefix.EVENT));
-    }
-    // info not required.
-    if (!hasField(fieldsToRetrieve, Field.INFO)) {
-      infoColFamilyList.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.NOT_EQUAL, ApplicationColumnPrefix.INFO));
-    }
-    // is related to not required.
-    if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
-      infoColFamilyList.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.NOT_EQUAL, ApplicationColumnPrefix.IS_RELATED_TO));
-    }
-    // relates to not required.
-    if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
-      infoColFamilyList.addFilter(
-          TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.NOT_EQUAL, ApplicationColumnPrefix.RELATES_TO));
-    }
-  }
-
-  /**
-   * Updates filter list based on fields for confs and metrics to retrieve.
-   *
-   * @param listBasedOnFields filter list based on fields.
-   * @throws IOException if any problem occurs while updating filter list.
-   */
-  private void updateFilterForConfsAndMetricsToRetrieve(
-      FilterList listBasedOnFields) throws IOException {
-    TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
-    // Please note that if confsToRetrieve is specified, we would have added
-    // CONFS to fields to retrieve in augmentParams() even if not specified.
-    if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) {
-      // Create a filter list for configs.
-      listBasedOnFields.addFilter(TimelineFilterUtils.
-          createFilterForConfsOrMetricsToRetrieve(
-              dataToRetrieve.getConfsToRetrieve(),
-              ApplicationColumnFamily.CONFIGS, 
ApplicationColumnPrefix.CONFIG));
-    }
-
-    // Please note that if metricsToRetrieve is specified, we would have added
-    // METRICS to fields to retrieve in augmentParams() even if not specified.
-    if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) {
-      // Create a filter list for metrics.
-      listBasedOnFields.addFilter(TimelineFilterUtils.
-          createFilterForConfsOrMetricsToRetrieve(
-              dataToRetrieve.getMetricsToRetrieve(),
-              ApplicationColumnFamily.METRICS, 
ApplicationColumnPrefix.METRIC));
-    }
-  }
-
-  @Override
-  protected FilterList constructFilterListBasedOnFields() throws IOException {
-    if (!needCreateFilterListBasedOnFields()) {
-      // Fetch all the columns. No need of a filter.
-      return null;
-    }
-    FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE);
-    FilterList infoColFamilyList = new FilterList();
-    // By default fetch everything in INFO column family.
-    FamilyFilter infoColumnFamily =
-        new FamilyFilter(CompareOp.EQUAL,
-            new BinaryComparator(ApplicationColumnFamily.INFO.getBytes()));
-    infoColFamilyList.addFilter(infoColumnFamily);
-    if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) {
-      // We can fetch only some of the columns from info family.
-      infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily());
-    } else {
-      // Exclude column prefixes in info column family which are not required
-      // based on fields to retrieve.
-      excludeFieldsFromInfoColFamily(infoColFamilyList);
-    }
-    listBasedOnFields.addFilter(infoColFamilyList);
-
-    updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields);
-    return listBasedOnFields;
-  }
-
-  @Override
-  protected Result getResult(Configuration hbaseConf, Connection conn,
-      FilterList filterList) throws IOException {
-    TimelineReaderContext context = getContext();
-    ApplicationRowKey applicationRowKey =
-        new ApplicationRowKey(context.getClusterId(), context.getUserId(),
-            context.getFlowName(), context.getFlowRunId(), context.getAppId());
-    byte[] rowKey = applicationRowKey.getRowKey();
-    Get get = new Get(rowKey);
-    // Set time range for metric values.
-    setMetricsTimeRange(get);
-    get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
-    if (filterList != null && !filterList.getFilters().isEmpty()) {
-      get.setFilter(filterList);
-    }
-    return getTable().getResult(hbaseConf, conn, get);
-  }
-
-  @Override
-  protected void validateParams() {
-    Preconditions.checkNotNull(getContext(), "context shouldn't be null");
-    Preconditions.checkNotNull(
-        getDataToRetrieve(), "data to retrieve shouldn't be null");
-    Preconditions.checkNotNull(getContext().getClusterId(),
-        "clusterId shouldn't be null");
-    Preconditions.checkNotNull(getContext().getEntityType(),
-        "entityType shouldn't be null");
-    if (isSingleEntityRead()) {
-      Preconditions.checkNotNull(getContext().getAppId(),
-          "appId shouldn't be null");
-    } else {
-      Preconditions.checkNotNull(getContext().getUserId(),
-          "userId shouldn't be null");
-      Preconditions.checkNotNull(getContext().getFlowName(),
-          "flowName shouldn't be null");
-    }
-  }
-
-  @Override
-  protected void augmentParams(Configuration hbaseConf, Connection conn)
-      throws IOException {
-    if (isSingleEntityRead()) {
-      // Get flow context information from AppToFlow table.
-      defaultAugmentParams(hbaseConf, conn);
-    }
-    // Add configs/metrics to fields to retrieve if confsToRetrieve and/or
-    // metricsToRetrieve are specified.
-    getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve();
-    if (!isSingleEntityRead()) {
-      createFiltersIfNull();
-    }
-  }
-
-  private void setMetricsTimeRange(Query query) {
-    // Set time range for metric values.
-    HBaseTimelineStorageUtils.setMetricsTimeRange(
-        query, ApplicationColumnFamily.METRICS.getBytes(),
-        getDataToRetrieve().getMetricsTimeBegin(),
-        getDataToRetrieve().getMetricsTimeEnd());
-  }
-
-  @Override
-  protected ResultScanner getResults(Configuration hbaseConf,
-      Connection conn, FilterList filterList) throws IOException {
-    Scan scan = new Scan();
-    TimelineReaderContext context = getContext();
-    RowKeyPrefix<ApplicationRowKey> applicationRowKeyPrefix = null;
-
-    // Whether or not flowRunID is null doesn't matter, the
-    // ApplicationRowKeyPrefix will do the right thing.
-    // default mode, will always scans from beginning of entity type.
-    if (getFilters().getFromId() == null) {
-      applicationRowKeyPrefix = new ApplicationRowKeyPrefix(
-          context.getClusterId(), context.getUserId(), context.getFlowName(),
-          context.getFlowRunId());
-      scan.setRowPrefixFilter(applicationRowKeyPrefix.getRowKeyPrefix());
-    } else {
-      ApplicationRowKey applicationRowKey = null;
-      try {
-        applicationRowKey =
-            ApplicationRowKey.parseRowKeyFromString(getFilters().getFromId());
-      } catch (IllegalArgumentException e) {
-        throw new BadRequestException("Invalid filter fromid is provided.");
-      }
-      if (!context.getClusterId().equals(applicationRowKey.getClusterId())) {
-        throw new BadRequestException(
-            "fromid doesn't belong to clusterId=" + context.getClusterId());
-      }
-
-      // set start row
-      scan.setStartRow(applicationRowKey.getRowKey());
-
-      // get the bytes for stop row
-      applicationRowKeyPrefix = new ApplicationRowKeyPrefix(
-          context.getClusterId(), context.getUserId(), context.getFlowName(),
-          context.getFlowRunId());
-
-      // set stop row
-      scan.setStopRow(
-          HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix(
-              applicationRowKeyPrefix.getRowKeyPrefix()));
-    }
-
-    FilterList newList = new FilterList();
-    newList.addFilter(new PageFilter(getFilters().getLimit()));
-    if (filterList != null && !filterList.getFilters().isEmpty()) {
-      newList.addFilter(filterList);
-    }
-    scan.setFilter(newList);
-
-    // Set time range for metric values.
-    setMetricsTimeRange(scan);
-    scan.setMaxVersions(getDataToRetrieve().getMetricsLimit());
-    return getTable().getResultScanner(hbaseConf, conn, scan);
-  }
-
-  @Override
-  protected TimelineEntity parseEntity(Result result) throws IOException {
-    if (result == null || result.isEmpty()) {
-      return null;
-    }
-    TimelineEntity entity = new TimelineEntity();
-    entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
-    String entityId = ApplicationColumn.ID.readResult(result).toString();
-    entity.setId(entityId);
-
-    TimelineEntityFilters filters = getFilters();
-    // fetch created time
-    Long createdTime = (Long) 
ApplicationColumn.CREATED_TIME.readResult(result);
-    entity.setCreatedTime(createdTime);
-
-    EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
-    // fetch is related to entities and match isRelatedTo filter. If 
isRelatedTo
-    // filters do not match, entity would be dropped. We have to match filters
-    // locally as relevant HBase filters to filter out rows on the basis of
-    // isRelatedTo are not set in HBase scan.
-    boolean checkIsRelatedTo =
-        !isSingleEntityRead() && filters.getIsRelatedTo() != null &&
-        filters.getIsRelatedTo().getFilterList().size() > 0;
-    if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) {
-      readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
-          true);
-      if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity,
-          filters.getIsRelatedTo())) {
-        return null;
-      }
-      if (!hasField(fieldsToRetrieve,
-          Field.IS_RELATED_TO)) {
-        entity.getIsRelatedToEntities().clear();
-      }
-    }
-
-    // fetch relates to entities and match relatesTo filter. If relatesTo
-    // filters do not match, entity would be dropped. We have to match filters
-    // locally as relevant HBase filters to filter out rows on the basis of
-    // relatesTo are not set in HBase scan.
-    boolean checkRelatesTo =
-        !isSingleEntityRead() && filters.getRelatesTo() != null &&
-        filters.getRelatesTo().getFilterList().size() > 0;
-    if (hasField(fieldsToRetrieve, Field.RELATES_TO) ||
-        checkRelatesTo) {
-      readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
-          false);
-      if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity,
-          filters.getRelatesTo())) {
-        return null;
-      }
-      if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
-        entity.getRelatesToEntities().clear();
-      }
-    }
-
-    // fetch info if fieldsToRetrieve contains INFO or ALL.
-    if (hasField(fieldsToRetrieve, Field.INFO)) {
-      readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
-    }
-
-    // fetch configs if fieldsToRetrieve contains CONFIGS or ALL.
-    if (hasField(fieldsToRetrieve, Field.CONFIGS)) {
-      readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
-    }
-
-    // fetch events and match event filters if they exist. If event filters do
-    // not match, entity would be dropped. We have to match filters locally
-    // as relevant HBase filters to filter out rows on the basis of events
-    // are not set in HBase scan.
-    boolean checkEvents =
-        !isSingleEntityRead() && filters.getEventFilters() != null &&
-        filters.getEventFilters().getFilterList().size() > 0;
-    if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) {
-      readEvents(entity, result, ApplicationColumnPrefix.EVENT);
-      if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity,
-          filters.getEventFilters())) {
-        return null;
-      }
-      if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
-        entity.getEvents().clear();
-      }
-    }
-
-    // fetch metrics if fieldsToRetrieve contains METRICS or ALL.
-    if (hasField(fieldsToRetrieve, Field.METRICS)) {
-      readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
-    }
-
-    ApplicationRowKey rowKey = ApplicationRowKey.parseRowKey(result.getRow());
-    entity.getInfo().put(TimelineReaderUtils.FROMID_KEY,
-        rowKey.getRowKeyAsString());
-    return entity;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java
deleted file mode 100644
index 3f72334..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Set;
-import java.util.TreeSet;
-
-/**
- * Timeline entity reader for listing all available entity types given one
- * reader context. Right now only supports listing all entity types within one
- * YARN application.
- */
-public final class EntityTypeReader extends AbstractTimelineStorageReader {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(EntityTypeReader.class);
-  private static final EntityTable ENTITY_TABLE = new EntityTable();
-
-  public EntityTypeReader(TimelineReaderContext context) {
-    super(context);
-  }
-
-  /**
-   * Reads a set of timeline entity types from the HBase storage for the given
-   * context.
-   *
-   * @param hbaseConf HBase Configuration.
-   * @param conn HBase Connection.
-   * @return a set of <cite>TimelineEntity</cite> objects, with only type field
-   *         set.
-   * @throws IOException if any exception is encountered while reading 
entities.
-   */
-  public Set<String> readEntityTypes(Configuration hbaseConf,
-      Connection conn) throws IOException {
-
-    validateParams();
-    augmentParams(hbaseConf, conn);
-
-    Set<String> types = new TreeSet<>();
-    TimelineReaderContext context = getContext();
-    EntityRowKeyPrefix prefix = new EntityRowKeyPrefix(context.getClusterId(),
-        context.getUserId(), context.getFlowName(), context.getFlowRunId(),
-        context.getAppId());
-    byte[] currRowKey = prefix.getRowKeyPrefix();
-    byte[] nextRowKey = prefix.getRowKeyPrefix();
-    nextRowKey[nextRowKey.length - 1]++;
-
-    FilterList typeFilterList = new FilterList();
-    typeFilterList.addFilter(new FirstKeyOnlyFilter());
-    typeFilterList.addFilter(new KeyOnlyFilter());
-    typeFilterList.addFilter(new PageFilter(1));
-    LOG.debug("FilterList created for scan is - {}", typeFilterList);
-
-    int counter = 0;
-    while (true) {
-      try (ResultScanner results =
-          getResult(hbaseConf, conn, typeFilterList, currRowKey, nextRowKey)) {
-        TimelineEntity entity = parseEntityForType(results.next());
-        if (entity == null) {
-          break;
-        }
-        ++counter;
-        if (!types.add(entity.getType())) {
-          LOG.warn("Failed to add type " + entity.getType()
-              + " to the result set because there is a duplicated copy. ");
-        }
-        String currType = entity.getType();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Current row key: " + Arrays.toString(currRowKey));
-          LOG.debug("New entity type discovered: " + currType);
-        }
-        currRowKey = getNextRowKey(prefix.getRowKeyPrefix(), currType);
-      }
-    }
-    LOG.debug("Scanned {} records for {} types", counter, types.size());
-    return types;
-  }
-
-  @Override
-  protected void validateParams() {
-    Preconditions.checkNotNull(getContext(), "context shouldn't be null");
-    Preconditions.checkNotNull(getContext().getClusterId(),
-        "clusterId shouldn't be null");
-    Preconditions.checkNotNull(getContext().getAppId(),
-        "appId shouldn't be null");
-  }
-
-  /**
-   * Gets the possibly next row key prefix given current prefix and type.
-   *
-   * @param currRowKeyPrefix The current prefix that contains user, cluster,
-   *                         flow, run, and application id.
-   * @param entityType Current entity type.
-   * @return A new prefix for the possibly immediately next row key.
-   */
-  private static byte[] getNextRowKey(byte[] currRowKeyPrefix,
-      String entityType) {
-    if (currRowKeyPrefix == null || entityType == null) {
-      return null;
-    }
-
-    byte[] entityTypeEncoded = Separator.QUALIFIERS.join(
-        Separator.encode(entityType, Separator.SPACE, Separator.TAB,
-            Separator.QUALIFIERS),
-        Separator.EMPTY_BYTES);
-
-    byte[] currRowKey
-        = new byte[currRowKeyPrefix.length + entityTypeEncoded.length];
-    System.arraycopy(currRowKeyPrefix, 0, currRowKey, 0,
-        currRowKeyPrefix.length);
-    System.arraycopy(entityTypeEncoded, 0, currRowKey, currRowKeyPrefix.length,
-        entityTypeEncoded.length);
-
-    return HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix(
-        currRowKey);
-  }
-
-  private ResultScanner getResult(Configuration hbaseConf, Connection conn,
-      FilterList filterList, byte[] startPrefix, byte[] endPrefix)
-      throws IOException {
-    Scan scan = new Scan(startPrefix, endPrefix);
-    scan.setFilter(filterList);
-    scan.setSmall(true);
-    return ENTITY_TABLE.getResultScanner(hbaseConf, conn, scan);
-  }
-
-  private TimelineEntity parseEntityForType(Result result)
-      throws IOException {
-    if (result == null || result.isEmpty()) {
-      return null;
-    }
-    TimelineEntity entity = new TimelineEntity();
-    EntityRowKey newRowKey = EntityRowKey.parseRowKey(result.getRow());
-    entity.setType(newRowKey.getEntityType());
-    return entity;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
deleted file mode 100644
index a1cdb29..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
-import org.apache.hadoop.yarn.webapp.BadRequestException;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Timeline entity reader for flow activity entities that are stored in the
- * flow activity table.
- */
-class FlowActivityEntityReader extends TimelineEntityReader {
-  private static final FlowActivityTable FLOW_ACTIVITY_TABLE =
-      new FlowActivityTable();
-
-  /**
-   * Used to convert Long key components to and from storage format.
-   */
-  private final KeyConverter<Long> longKeyConverter = new LongKeyConverter();
-
-
-  public FlowActivityEntityReader(TimelineReaderContext ctxt,
-      TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
-    super(ctxt, entityFilters, toRetrieve);
-  }
-
-  public FlowActivityEntityReader(TimelineReaderContext ctxt,
-      TimelineDataToRetrieve toRetrieve) {
-    super(ctxt, toRetrieve);
-  }
-
-  /**
-   * Uses the {@link FlowActivityTable}.
-   */
-  @Override
-  protected BaseTable<?> getTable() {
-    return FLOW_ACTIVITY_TABLE;
-  }
-
-  @Override
-  protected void validateParams() {
-    Preconditions.checkNotNull(getContext().getClusterId(),
-        "clusterId shouldn't be null");
-  }
-
-  @Override
-  protected void augmentParams(Configuration hbaseConf, Connection conn)
-      throws IOException {
-    createFiltersIfNull();
-  }
-
-  @Override
-  protected FilterList constructFilterListBasedOnFilters() throws IOException {
-    return null;
-  }
-
-  @Override
-  protected FilterList constructFilterListBasedOnFields() {
-    return null;
-  }
-
-  @Override
-  protected Result getResult(Configuration hbaseConf, Connection conn,
-      FilterList filterList) throws IOException {
-    throw new UnsupportedOperationException(
-        "we don't support a single entity query");
-  }
-
-  @Override
-  protected ResultScanner getResults(Configuration hbaseConf,
-      Connection conn, FilterList filterList) throws IOException {
-    Scan scan = new Scan();
-    String clusterId = getContext().getClusterId();
-    if (getFilters().getFromId() == null
-        && getFilters().getCreatedTimeBegin() == 0L
-        && getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) {
-       // All records have to be chosen.
-      scan.setRowPrefixFilter(new FlowActivityRowKeyPrefix(clusterId)
-          .getRowKeyPrefix());
-    } else if (getFilters().getFromId() != null) {
-      FlowActivityRowKey key = null;
-      try {
-        key =
-            FlowActivityRowKey.parseRowKeyFromString(getFilters().getFromId());
-      } catch (IllegalArgumentException e) {
-        throw new BadRequestException("Invalid filter fromid is provided.");
-      }
-      if (!clusterId.equals(key.getClusterId())) {
-        throw new BadRequestException(
-            "fromid doesn't belong to clusterId=" + clusterId);
-      }
-      scan.setStartRow(key.getRowKey());
-      scan.setStopRow(
-          new FlowActivityRowKeyPrefix(clusterId,
-              (getFilters().getCreatedTimeBegin() <= 0 ? 0
-                  : (getFilters().getCreatedTimeBegin() - 1)))
-                      .getRowKeyPrefix());
-    } else {
-      scan.setStartRow(new FlowActivityRowKeyPrefix(clusterId, getFilters()
-          .getCreatedTimeEnd()).getRowKeyPrefix());
-      scan.setStopRow(new FlowActivityRowKeyPrefix(clusterId, (getFilters()
-          .getCreatedTimeBegin() <= 0 ? 0
-          : (getFilters().getCreatedTimeBegin() - 1))).getRowKeyPrefix());
-    }
-    // use the page filter to limit the result to the page size
-    // the scanner may still return more than the limit; therefore we need to
-    // read the right number as we iterate
-    scan.setFilter(new PageFilter(getFilters().getLimit()));
-    return getTable().getResultScanner(hbaseConf, conn, scan);
-  }
-
-  @Override
-  protected TimelineEntity parseEntity(Result result) throws IOException {
-    FlowActivityRowKey rowKey = 
FlowActivityRowKey.parseRowKey(result.getRow());
-
-    Long time = rowKey.getDayTimestamp();
-    String user = rowKey.getUserId();
-    String flowName = rowKey.getFlowName();
-
-    FlowActivityEntity flowActivity = new FlowActivityEntity(
-        getContext().getClusterId(), time, user, flowName);
-    // set the id
-    flowActivity.setId(flowActivity.getId());
-    // get the list of run ids along with the version that are associated with
-    // this flow on this day
-    Map<Long, Object> runIdsMap =
-        FlowActivityColumnPrefix.RUN_ID.readResults(result, longKeyConverter);
-    for (Map.Entry<Long, Object> e : runIdsMap.entrySet()) {
-      Long runId = e.getKey();
-      String version = (String)e.getValue();
-      FlowRunEntity flowRun = new FlowRunEntity();
-      flowRun.setUser(user);
-      flowRun.setName(flowName);
-      flowRun.setRunId(runId);
-      flowRun.setVersion(version);
-      // set the id
-      flowRun.setId(flowRun.getId());
-      flowActivity.addFlowRun(flowRun);
-    }
-    flowActivity.getInfo().put(TimelineReaderUtils.FROMID_KEY,
-        rowKey.getRowKeyAsString());
-    return flowActivity;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
deleted file mode 100644
index af043b3..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
-
-import java.io.IOException;
-import java.util.EnumSet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.FamilyFilter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.hbase.filter.QualifierFilter;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKeyPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
-import org.apache.hadoop.yarn.webapp.BadRequestException;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Timeline entity reader for flow run entities that are stored in the flow run
- * table.
- */
-class FlowRunEntityReader extends TimelineEntityReader {
-  private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable();
-
-  public FlowRunEntityReader(TimelineReaderContext ctxt,
-      TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
-    super(ctxt, entityFilters, toRetrieve);
-  }
-
-  public FlowRunEntityReader(TimelineReaderContext ctxt,
-      TimelineDataToRetrieve toRetrieve) {
-    super(ctxt, toRetrieve);
-  }
-
-  /**
-   * Uses the {@link FlowRunTable}.
-   */
-  @Override
-  protected BaseTable<?> getTable() {
-    return FLOW_RUN_TABLE;
-  }
-
-  @Override
-  protected void validateParams() {
-    Preconditions.checkNotNull(getContext(), "context shouldn't be null");
-    Preconditions.checkNotNull(getDataToRetrieve(),
-        "data to retrieve shouldn't be null");
-    Preconditions.checkNotNull(getContext().getClusterId(),
-        "clusterId shouldn't be null");
-    Preconditions.checkNotNull(getContext().getUserId(),
-        "userId shouldn't be null");
-    Preconditions.checkNotNull(getContext().getFlowName(),
-        "flowName shouldn't be null");
-    if (isSingleEntityRead()) {
-      Preconditions.checkNotNull(getContext().getFlowRunId(),
-          "flowRunId shouldn't be null");
-    }
-    EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
-    if (!isSingleEntityRead() && fieldsToRetrieve != null) {
-      for (Field field : fieldsToRetrieve) {
-        if (field != Field.ALL && field != Field.METRICS) {
-          throw new BadRequestException("Invalid field " + field
-              + " specified while querying flow runs.");
-        }
-      }
-    }
-  }
-
-  @Override
-  protected void augmentParams(Configuration hbaseConf, Connection conn) {
-    // Add metrics to fields to retrieve if metricsToRetrieve is specified.
-    getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve();
-    if (!isSingleEntityRead()) {
-      createFiltersIfNull();
-    }
-  }
-
-  protected FilterList constructFilterListBasedOnFilters() throws IOException {
-    FilterList listBasedOnFilters = new FilterList();
-    // Filter based on created time range.
-    Long createdTimeBegin = getFilters().getCreatedTimeBegin();
-    Long createdTimeEnd = getFilters().getCreatedTimeEnd();
-    if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
-      listBasedOnFilters.addFilter(TimelineFilterUtils
-          .createSingleColValueFiltersByRange(FlowRunColumn.MIN_START_TIME,
-              createdTimeBegin, createdTimeEnd));
-    }
-    // Filter based on metric filters.
-    TimelineFilterList metricFilters = getFilters().getMetricFilters();
-    if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
-      listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
-          FlowRunColumnPrefix.METRIC, metricFilters));
-    }
-    return listBasedOnFilters;
-  }
-
-  /**
-   * Add {@link QualifierFilter} filters to filter list for each column of flow
-   * run table.
-   *
-   * @return filter list to which qualifier filters have been added.
-   */
-  private FilterList updateFixedColumns() {
-    FilterList columnsList = new FilterList(Operator.MUST_PASS_ONE);
-    for (FlowRunColumn column : FlowRunColumn.values()) {
-      columnsList.addFilter(new QualifierFilter(CompareOp.EQUAL,
-          new BinaryComparator(column.getColumnQualifierBytes())));
-    }
-    return columnsList;
-  }
-
-  @Override
-  protected FilterList constructFilterListBasedOnFields() throws IOException {
-    FilterList list = new FilterList(Operator.MUST_PASS_ONE);
-    // By default fetch everything in INFO column family.
-    FamilyFilter infoColumnFamily =
-        new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(
-            FlowRunColumnFamily.INFO.getBytes()));
-    TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
-    // If multiple entities have to be retrieved, check if metrics have to be
-    // retrieved and if not, add a filter so that metrics can be excluded.
-    // Metrics are always returned if we are reading a single entity.
-    if (!isSingleEntityRead()
-        && !hasField(dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) {
-      FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE);
-      infoColFamilyList.addFilter(infoColumnFamily);
-      infoColFamilyList.addFilter(new QualifierFilter(CompareOp.NOT_EQUAL,
-          new BinaryPrefixComparator(FlowRunColumnPrefix.METRIC
-              .getColumnPrefixBytes(""))));
-      list.addFilter(infoColFamilyList);
-    } else {
-      // Check if metricsToRetrieve are specified and if they are, create a
-      // filter list for info column family by adding flow run tables columns
-      // and a list for metrics to retrieve. Pls note that fieldsToRetrieve
-      // will have METRICS added to it if metricsToRetrieve are specified
-      // (in augmentParams()).
-      TimelineFilterList metricsToRetrieve =
-          dataToRetrieve.getMetricsToRetrieve();
-      if (metricsToRetrieve != null
-          && !metricsToRetrieve.getFilterList().isEmpty()) {
-        FilterList infoColFamilyList = new FilterList();
-        infoColFamilyList.addFilter(infoColumnFamily);
-        FilterList columnsList = updateFixedColumns();
-        columnsList.addFilter(TimelineFilterUtils.createHBaseFilterList(
-            FlowRunColumnPrefix.METRIC, metricsToRetrieve));
-        infoColFamilyList.addFilter(columnsList);
-        list.addFilter(infoColFamilyList);
-      }
-    }
-    return list;
-  }
-
-  @Override
-  protected Result getResult(Configuration hbaseConf, Connection conn,
-      FilterList filterList) throws IOException {
-    TimelineReaderContext context = getContext();
-    FlowRunRowKey flowRunRowKey =
-        new FlowRunRowKey(context.getClusterId(), context.getUserId(),
-            context.getFlowName(), context.getFlowRunId());
-    byte[] rowKey = flowRunRowKey.getRowKey();
-    Get get = new Get(rowKey);
-    get.setMaxVersions(Integer.MAX_VALUE);
-    if (filterList != null && !filterList.getFilters().isEmpty()) {
-      get.setFilter(filterList);
-    }
-    return getTable().getResult(hbaseConf, conn, get);
-  }
-
-  @Override
-  protected ResultScanner getResults(Configuration hbaseConf, Connection conn,
-      FilterList filterList) throws IOException {
-    Scan scan = new Scan();
-    TimelineReaderContext context = getContext();
-    RowKeyPrefix<FlowRunRowKey> flowRunRowKeyPrefix = null;
-    if (getFilters().getFromId() == null) {
-      flowRunRowKeyPrefix = new FlowRunRowKeyPrefix(context.getClusterId(),
-          context.getUserId(), context.getFlowName());
-      scan.setRowPrefixFilter(flowRunRowKeyPrefix.getRowKeyPrefix());
-    } else {
-      FlowRunRowKey flowRunRowKey = null;
-      try {
-        flowRunRowKey =
-            FlowRunRowKey.parseRowKeyFromString(getFilters().getFromId());
-      } catch (IllegalArgumentException e) {
-        throw new BadRequestException("Invalid filter fromid is provided.");
-      }
-      if (!context.getClusterId().equals(flowRunRowKey.getClusterId())) {
-        throw new BadRequestException(
-            "fromid doesn't belong to clusterId=" + context.getClusterId());
-      }
-      // set start row
-      scan.setStartRow(flowRunRowKey.getRowKey());
-
-      // get the bytes for stop row
-      flowRunRowKeyPrefix = new FlowRunRowKeyPrefix(context.getClusterId(),
-          context.getUserId(), context.getFlowName());
-
-      // set stop row
-      scan.setStopRow(
-          HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix(
-              flowRunRowKeyPrefix.getRowKeyPrefix()));
-    }
-
-    FilterList newList = new FilterList();
-    newList.addFilter(new PageFilter(getFilters().getLimit()));
-    if (filterList != null && !filterList.getFilters().isEmpty()) {
-      newList.addFilter(filterList);
-    }
-    scan.setFilter(newList);
-    scan.setMaxVersions(Integer.MAX_VALUE);
-    return getTable().getResultScanner(hbaseConf, conn, scan);
-  }
-
-  @Override
-  protected TimelineEntity parseEntity(Result result) throws IOException {
-    FlowRunEntity flowRun = new FlowRunEntity();
-    FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow());
-    flowRun.setRunId(rowKey.getFlowRunId());
-    flowRun.setUser(rowKey.getUserId());
-    flowRun.setName(rowKey.getFlowName());
-
-    // read the start time
-    Long startTime = (Long) FlowRunColumn.MIN_START_TIME.readResult(result);
-    if (startTime != null) {
-      flowRun.setStartTime(startTime.longValue());
-    }
-
-    // read the end time if available
-    Long endTime = (Long) FlowRunColumn.MAX_END_TIME.readResult(result);
-    if (endTime != null) {
-      flowRun.setMaxEndTime(endTime.longValue());
-    }
-
-    // read the flow version
-    String version = (String) FlowRunColumn.FLOW_VERSION.readResult(result);
-    if (version != null) {
-      flowRun.setVersion(version);
-    }
-
-    // read metrics if its a single entity query or if METRICS are part of
-    // fieldsToRetrieve.
-    if (isSingleEntityRead()
-        || hasField(getDataToRetrieve().getFieldsToRetrieve(), Field.METRICS)) 
{
-      readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC);
-    }
-
-    // set the id
-    flowRun.setId(flowRun.getId());
-    flowRun.getInfo().put(TimelineReaderUtils.FROMID_KEY,
-        rowKey.getRowKeyAsString());
-    return flowRun;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
deleted file mode 100644
index 3a44445..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
+++ /dev/null
@@ -1,651 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Query;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.FamilyFilter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.hbase.filter.QualifierFilter;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
-import org.apache.hadoop.yarn.webapp.BadRequestException;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Timeline entity reader for generic entities that are stored in the entity
- * table.
- */
-class GenericEntityReader extends TimelineEntityReader {
-  private static final EntityTable ENTITY_TABLE = new EntityTable();
-
-  /**
-   * Used to convert strings key components to and from storage format.
-   */
-  private final KeyConverter<String> stringKeyConverter =
-      new StringKeyConverter();
-
-  public GenericEntityReader(TimelineReaderContext ctxt,
-      TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
-    super(ctxt, entityFilters, toRetrieve);
-  }
-
-  public GenericEntityReader(TimelineReaderContext ctxt,
-      TimelineDataToRetrieve toRetrieve) {
-    super(ctxt, toRetrieve);
-  }
-
-  /**
-   * Uses the {@link EntityTable}.
-   */
-  protected BaseTable<?> getTable() {
-    return ENTITY_TABLE;
-  }
-
-  @Override
-  protected FilterList constructFilterListBasedOnFilters() throws IOException {
-    // Filters here cannot be null for multiple entity reads as they are set in
-    // augmentParams if null.
-    FilterList listBasedOnFilters = new FilterList();
-    TimelineEntityFilters filters = getFilters();
-    // Create filter list based on created time range and add it to
-    // listBasedOnFilters.
-    long createdTimeBegin = filters.getCreatedTimeBegin();
-    long createdTimeEnd = filters.getCreatedTimeEnd();
-    if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
-      listBasedOnFilters.addFilter(TimelineFilterUtils
-          .createSingleColValueFiltersByRange(EntityColumn.CREATED_TIME,
-              createdTimeBegin, createdTimeEnd));
-    }
-    // Create filter list based on metric filters and add it to
-    // listBasedOnFilters.
-    TimelineFilterList metricFilters = filters.getMetricFilters();
-    if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
-      listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
-          EntityColumnPrefix.METRIC, metricFilters));
-    }
-    // Create filter list based on config filters and add it to
-    // listBasedOnFilters.
-    TimelineFilterList configFilters = filters.getConfigFilters();
-    if (configFilters != null && !configFilters.getFilterList().isEmpty()) {
-      listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
-          EntityColumnPrefix.CONFIG, configFilters));
-    }
-    // Create filter list based on info filters and add it to 
listBasedOnFilters
-    TimelineFilterList infoFilters = filters.getInfoFilters();
-    if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) {
-      listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
-          EntityColumnPrefix.INFO, infoFilters));
-    }
-    return listBasedOnFilters;
-  }
-
-  /**
-   * Check if we need to fetch only some of the event columns.
-   *
-   * @return true if we need to fetch some of the columns, false otherwise.
-   */
-  protected boolean fetchPartialEventCols(TimelineFilterList eventFilters,
-      EnumSet<Field> fieldsToRetrieve) {
-    return (eventFilters != null && !eventFilters.getFilterList().isEmpty() &&
-        !hasField(fieldsToRetrieve, Field.EVENTS));
-  }
-
-  /**
-   * Check if we need to fetch only some of the relates_to columns.
-   *
-   * @return true if we need to fetch some of the columns, false otherwise.
-   */
-  protected boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo,
-      EnumSet<Field> fieldsToRetrieve) {
-    return (relatesTo != null && !relatesTo.getFilterList().isEmpty() &&
-        !hasField(fieldsToRetrieve, Field.RELATES_TO));
-  }
-
-  /**
-   * Check if we need to fetch only some of the is_related_to columns.
-   *
-   * @return true if we need to fetch some of the columns, false otherwise.
-   */
-  private boolean fetchPartialIsRelatedToCols(TimelineFilterList isRelatedTo,
-      EnumSet<Field> fieldsToRetrieve) {
-    return (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty() &&
-        !hasField(fieldsToRetrieve, Field.IS_RELATED_TO));
-  }
-
-  /**
-   * Check if we need to fetch only some of the columns based on event filters,
-   * relatesto and isrelatedto from info family.
-   *
-   * @return true, if we need to fetch only some of the columns, false if we
-   *         need to fetch all the columns under info column family.
-   */
-  protected boolean fetchPartialColsFromInfoFamily() {
-    EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
-    TimelineEntityFilters filters = getFilters();
-    return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve)
-        || fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve)
-        || fetchPartialIsRelatedToCols(filters.getIsRelatedTo(),
-            fieldsToRetrieve);
-  }
-
-  /**
-   * Check if we need to create filter list based on fields. We need to create 
a
-   * filter list iff all fields need not be retrieved or we have some specific
-   * fields or metrics to retrieve. We also need to create a filter list if we
-   * have relationships(relatesTo/isRelatedTo) and event filters specified for
-   * the query.
-   *
-   * @return true if we need to create the filter list, false otherwise.
-   */
-  protected boolean needCreateFilterListBasedOnFields() {
-    TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
-    // Check if all fields are to be retrieved or not. If all fields have to
-    // be retrieved, also check if we have some metrics or configs to
-    // retrieve specified for the query because then a filter list will have
-    // to be created.
-    boolean flag =
-        !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL)
-            || (dataToRetrieve.getConfsToRetrieve() != null && !dataToRetrieve
-                .getConfsToRetrieve().getFilterList().isEmpty())
-            || (dataToRetrieve.getMetricsToRetrieve() != null && 
!dataToRetrieve
-                .getMetricsToRetrieve().getFilterList().isEmpty());
-    // Filters need to be checked only if we are reading multiple entities. If
-    // condition above is false, we check if there are relationships(relatesTo/
-    // isRelatedTo) and event filters specified for the query.
-    if (!flag && !isSingleEntityRead()) {
-      TimelineEntityFilters filters = getFilters();
-      flag =
-          (filters.getEventFilters() != null && !filters.getEventFilters()
-              .getFilterList().isEmpty())
-              || (filters.getIsRelatedTo() != null && !filters.getIsRelatedTo()
-                  .getFilterList().isEmpty())
-              || (filters.getRelatesTo() != null && !filters.getRelatesTo()
-                  .getFilterList().isEmpty());
-    }
-    return flag;
-  }
-
-  /**
-   * Add {@link QualifierFilter} filters to filter list for each column of
-   * entity table.
-   *
-   * @param list filter list to which qualifier filters have to be added.
-   */
-  protected void updateFixedColumns(FilterList list) {
-    for (EntityColumn column : EntityColumn.values()) {
-      list.addFilter(new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(
-          column.getColumnQualifierBytes())));
-    }
-  }
-
-  /**
-   * Creates a filter list which indicates that only some of the column
-   * qualifiers in the info column family will be returned in result.
-   *
-   * @param isApplication If true, it means operations are to be performed for
-   *          application table, otherwise for entity table.
-   * @return filter list.
-   * @throws IOException if any problem occurs while creating filter list.
-   */
-  private FilterList createFilterListForColsOfInfoFamily() throws IOException {
-    FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE);
-    // Add filters for each column in entity table.
-    updateFixedColumns(infoFamilyColsFilter);
-    EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
-    // If INFO field has to be retrieved, add a filter for fetching columns
-    // with INFO column prefix.
-    if (hasField(fieldsToRetrieve, Field.INFO)) {
-      infoFamilyColsFilter
-          .addFilter(TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.EQUAL, EntityColumnPrefix.INFO));
-    }
-    TimelineFilterList relatesTo = getFilters().getRelatesTo();
-    if (hasField(fieldsToRetrieve, Field.RELATES_TO)) {
-      // If RELATES_TO field has to be retrieved, add a filter for fetching
-      // columns with RELATES_TO column prefix.
-      infoFamilyColsFilter.addFilter(TimelineFilterUtils
-          .createHBaseQualifierFilter(CompareOp.EQUAL,
-              EntityColumnPrefix.RELATES_TO));
-    } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) {
-      // Even if fields to retrieve does not contain RELATES_TO, we still
-      // need to have a filter to fetch some of the column qualifiers if
-      // relatesTo filters are specified. relatesTo filters will then be
-      // matched after fetching rows from HBase.
-      Set<String> relatesToCols =
-          TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo);
-      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
-          EntityColumnPrefix.RELATES_TO, relatesToCols));
-    }
-    TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo();
-    if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
-      // If IS_RELATED_TO field has to be retrieved, add a filter for fetching
-      // columns with IS_RELATED_TO column prefix.
-      infoFamilyColsFilter.addFilter(TimelineFilterUtils
-          .createHBaseQualifierFilter(CompareOp.EQUAL,
-              EntityColumnPrefix.IS_RELATED_TO));
-    } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) {
-      // Even if fields to retrieve does not contain IS_RELATED_TO, we still
-      // need to have a filter to fetch some of the column qualifiers if
-      // isRelatedTo filters are specified. isRelatedTo filters will then be
-      // matched after fetching rows from HBase.
-      Set<String> isRelatedToCols =
-          TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo);
-      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
-          EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols));
-    }
-    TimelineFilterList eventFilters = getFilters().getEventFilters();
-    if (hasField(fieldsToRetrieve, Field.EVENTS)) {
-      // If EVENTS field has to be retrieved, add a filter for fetching columns
-      // with EVENT column prefix.
-      infoFamilyColsFilter
-          .addFilter(TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.EQUAL, EntityColumnPrefix.EVENT));
-    } else if (eventFilters != null &&
-        !eventFilters.getFilterList().isEmpty()) {
-      // Even if fields to retrieve does not contain EVENTS, we still need to
-      // have a filter to fetch some of the column qualifiers on the basis of
-      // event filters specified. Event filters will then be matched after
-      // fetching rows from HBase.
-      Set<String> eventCols =
-          TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters);
-      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
-          EntityColumnPrefix.EVENT, eventCols));
-    }
-    return infoFamilyColsFilter;
-  }
-
-  /**
-   * Exclude column prefixes via filters which are not required(based on fields
-   * to retrieve) from info column family. These filters are added to filter
-   * list which contains a filter for getting info column family.
-   *
-   * @param infoColFamilyList filter list for info column family.
-   */
-  private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
-    EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
-    // Events not required.
-    if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
-      infoColFamilyList.addFilter(TimelineFilterUtils
-          .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
-              EntityColumnPrefix.EVENT));
-    }
-    // info not required.
-    if (!hasField(fieldsToRetrieve, Field.INFO)) {
-      infoColFamilyList.addFilter(TimelineFilterUtils
-          .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
-              EntityColumnPrefix.INFO));
-    }
-    // is related to not required.
-    if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
-      infoColFamilyList.addFilter(TimelineFilterUtils
-          .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
-              EntityColumnPrefix.IS_RELATED_TO));
-    }
-    // relates to not required.
-    if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
-      infoColFamilyList.addFilter(TimelineFilterUtils
-          .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
-              EntityColumnPrefix.RELATES_TO));
-    }
-  }
-
-  /**
-   * Updates filter list based on fields for confs and metrics to retrieve.
-   *
-   * @param listBasedOnFields filter list based on fields.
-   * @throws IOException if any problem occurs while updating filter list.
-   */
-  private void updateFilterForConfsAndMetricsToRetrieve(
-      FilterList listBasedOnFields) throws IOException {
-    TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
-    // Please note that if confsToRetrieve is specified, we would have added
-    // CONFS to fields to retrieve in augmentParams() even if not specified.
-    if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) {
-      // Create a filter list for configs.
-      listBasedOnFields.addFilter(TimelineFilterUtils
-          .createFilterForConfsOrMetricsToRetrieve(
-              dataToRetrieve.getConfsToRetrieve(), EntityColumnFamily.CONFIGS,
-              EntityColumnPrefix.CONFIG));
-    }
-
-    // Please note that if metricsToRetrieve is specified, we would have added
-    // METRICS to fields to retrieve in augmentParams() even if not specified.
-    if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) {
-      // Create a filter list for metrics.
-      listBasedOnFields.addFilter(TimelineFilterUtils
-          .createFilterForConfsOrMetricsToRetrieve(
-              dataToRetrieve.getMetricsToRetrieve(),
-              EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC));
-    }
-  }
-
-  @Override
-  protected FilterList constructFilterListBasedOnFields() throws IOException {
-    if (!needCreateFilterListBasedOnFields()) {
-      // Fetch all the columns. No need of a filter.
-      return null;
-    }
-    FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE);
-    FilterList infoColFamilyList = new FilterList();
-    // By default fetch everything in INFO column family.
-    FamilyFilter infoColumnFamily =
-        new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(
-            EntityColumnFamily.INFO.getBytes()));
-    infoColFamilyList.addFilter(infoColumnFamily);
-    if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) {
-      // We can fetch only some of the columns from info family.
-      infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily());
-    } else {
-      // Exclude column prefixes in info column family which are not required
-      // based on fields to retrieve.
-      excludeFieldsFromInfoColFamily(infoColFamilyList);
-    }
-    listBasedOnFields.addFilter(infoColFamilyList);
-    updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields);
-    return listBasedOnFields;
-  }
-
-  @Override
-  protected void validateParams() {
-    Preconditions.checkNotNull(getContext(), "context shouldn't be null");
-    Preconditions.checkNotNull(getDataToRetrieve(),
-        "data to retrieve shouldn't be null");
-    Preconditions.checkNotNull(getContext().getClusterId(),
-        "clusterId shouldn't be null");
-    Preconditions.checkNotNull(getContext().getAppId(),
-        "appId shouldn't be null");
-    Preconditions.checkNotNull(getContext().getEntityType(),
-        "entityType shouldn't be null");
-    if (isSingleEntityRead()) {
-      Preconditions.checkNotNull(getContext().getEntityId(),
-          "entityId shouldn't be null");
-    }
-  }
-
-  @Override
-  protected void augmentParams(Configuration hbaseConf, Connection conn)
-      throws IOException {
-    defaultAugmentParams(hbaseConf, conn);
-    // Add configs/metrics to fields to retrieve if confsToRetrieve and/or
-    // metricsToRetrieve are specified.
-    getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve();
-    if (!isSingleEntityRead()) {
-      createFiltersIfNull();
-    }
-  }
-
-  @Override
-  protected Result getResult(Configuration hbaseConf, Connection conn,
-      FilterList filterList) throws IOException {
-    TimelineReaderContext context = getContext();
-    Result result = null;
-    if (context.getEntityIdPrefix() != null) {
-      byte[] rowKey = new EntityRowKey(context.getClusterId(),
-          context.getUserId(), context.getFlowName(), context.getFlowRunId(),
-          context.getAppId(), context.getEntityType(),
-          context.getEntityIdPrefix(), context.getEntityId()).getRowKey();
-      Get get = new Get(rowKey);
-      setMetricsTimeRange(get);
-      get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
-      if (filterList != null && !filterList.getFilters().isEmpty()) {
-        get.setFilter(filterList);
-      }
-      result = getTable().getResult(hbaseConf, conn, get);
-
-    } else {
-      // Prepare for range scan
-      // create single SingleColumnValueFilter and add to existing filters.
-      FilterList filter = new FilterList(Operator.MUST_PASS_ALL);
-      if (filterList != null && !filterList.getFilters().isEmpty()) {
-        filter.addFilter(filterList);
-      }
-      FilterList newFilter = new FilterList();
-      newFilter.addFilter(TimelineFilterUtils.createHBaseSingleColValueFilter(
-          EntityColumn.ID, context.getEntityId(), CompareOp.EQUAL));
-      newFilter.addFilter(new PageFilter(1));
-      filter.addFilter(newFilter);
-
-      ResultScanner results = getResults(hbaseConf, conn, filter);
-      try {
-        Iterator<Result> iterator = results.iterator();
-        if (iterator.hasNext()) {
-          result = iterator.next();
-        }
-      } finally {
-        results.close();
-      }
-    }
-    return result;
-  }
-
-  private void setMetricsTimeRange(Query query) {
-    // Set time range for metric values.
-    HBaseTimelineStorageUtils.setMetricsTimeRange(
-        query, EntityColumnFamily.METRICS.getBytes(),
-        getDataToRetrieve().getMetricsTimeBegin(),
-        getDataToRetrieve().getMetricsTimeEnd());
-  }
-
-  @Override
-  protected ResultScanner getResults(Configuration hbaseConf, Connection conn,
-      FilterList filterList) throws IOException {
-    // Scan through part of the table to find the entities belong to one app
-    // and one type
-    Scan scan = new Scan();
-    TimelineReaderContext context = getContext();
-    RowKeyPrefix<EntityRowKey> entityRowKeyPrefix = null;
-    // default mode, will always scans from beginning of entity type.
-    if (getFilters() == null || getFilters().getFromId() == null) {
-      entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(),
-          context.getUserId(), context.getFlowName(), context.getFlowRunId(),
-          context.getAppId(), context.getEntityType(), null, null);
-      scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix());
-    } else { // pagination mode, will scan from given entityIdPrefix!enitityId
-
-      EntityRowKey entityRowKey = null;
-      try {
-        entityRowKey =
-            EntityRowKey.parseRowKeyFromString(getFilters().getFromId());
-      } catch (IllegalArgumentException e) {
-        throw new BadRequestException("Invalid filter fromid is provided.");
-      }
-      if (!context.getClusterId().equals(entityRowKey.getClusterId())) {
-        throw new BadRequestException(
-            "fromid doesn't belong to clusterId=" + context.getClusterId());
-      }
-
-      // set start row
-      scan.setStartRow(entityRowKey.getRowKey());
-
-      // get the bytes for stop row
-      entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(),
-          context.getUserId(), context.getFlowName(), context.getFlowRunId(),
-          context.getAppId(), context.getEntityType(), null, null);
-
-      // set stop row
-      scan.setStopRow(
-          HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix(
-              entityRowKeyPrefix.getRowKeyPrefix()));
-
-      // set page filter to limit. This filter has to set only in pagination
-      // mode.
-      filterList.addFilter(new PageFilter(getFilters().getLimit()));
-    }
-    setMetricsTimeRange(scan);
-    scan.setMaxVersions(getDataToRetrieve().getMetricsLimit());
-    if (filterList != null && !filterList.getFilters().isEmpty()) {
-      scan.setFilter(filterList);
-    }
-    return getTable().getResultScanner(hbaseConf, conn, scan);
-  }
-
-  @Override
-  protected TimelineEntity parseEntity(Result result) throws IOException {
-    if (result == null || result.isEmpty()) {
-      return null;
-    }
-    TimelineEntity entity = new TimelineEntity();
-    EntityRowKey parseRowKey = EntityRowKey.parseRowKey(result.getRow());
-    entity.setType(parseRowKey.getEntityType());
-    entity.setId(parseRowKey.getEntityId());
-    entity.setIdPrefix(parseRowKey.getEntityIdPrefix().longValue());
-
-    TimelineEntityFilters filters = getFilters();
-    // fetch created time
-    Long createdTime = (Long) EntityColumn.CREATED_TIME.readResult(result);
-    entity.setCreatedTime(createdTime);
-
-    EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
-    // fetch is related to entities and match isRelatedTo filter. If 
isRelatedTo
-    // filters do not match, entity would be dropped. We have to match filters
-    // locally as relevant HBase filters to filter out rows on the basis of
-    // isRelatedTo are not set in HBase scan.
-    boolean checkIsRelatedTo =
-        !isSingleEntityRead() && filters.getIsRelatedTo() != null
-            && filters.getIsRelatedTo().getFilterList().size() > 0;
-    if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) {
-      readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
-      if (checkIsRelatedTo
-          && !TimelineStorageUtils.matchIsRelatedTo(entity,
-              filters.getIsRelatedTo())) {
-        return null;
-      }
-      if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
-        entity.getIsRelatedToEntities().clear();
-      }
-    }
-
-    // fetch relates to entities and match relatesTo filter. If relatesTo
-    // filters do not match, entity would be dropped. We have to match filters
-    // locally as relevant HBase filters to filter out rows on the basis of
-    // relatesTo are not set in HBase scan.
-    boolean checkRelatesTo =
-        !isSingleEntityRead() && filters.getRelatesTo() != null
-            && filters.getRelatesTo().getFilterList().size() > 0;
-    if (hasField(fieldsToRetrieve, Field.RELATES_TO)
-        || checkRelatesTo) {
-      readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
-      if (checkRelatesTo
-          && !TimelineStorageUtils.matchRelatesTo(entity,
-              filters.getRelatesTo())) {
-        return null;
-      }
-      if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
-        entity.getRelatesToEntities().clear();
-      }
-    }
-
-    // fetch info if fieldsToRetrieve contains INFO or ALL.
-    if (hasField(fieldsToRetrieve, Field.INFO)) {
-      readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
-    }
-
-    // fetch configs if fieldsToRetrieve contains CONFIGS or ALL.
-    if (hasField(fieldsToRetrieve, Field.CONFIGS)) {
-      readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
-    }
-
-    // fetch events and match event filters if they exist. If event filters do
-    // not match, entity would be dropped. We have to match filters locally
-    // as relevant HBase filters to filter out rows on the basis of events
-    // are not set in HBase scan.
-    boolean checkEvents =
-        !isSingleEntityRead() && filters.getEventFilters() != null
-            && filters.getEventFilters().getFilterList().size() > 0;
-    if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) {
-      readEvents(entity, result, EntityColumnPrefix.EVENT);
-      if (checkEvents
-          && !TimelineStorageUtils.matchEventFilters(entity,
-              filters.getEventFilters())) {
-        return null;
-      }
-      if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
-        entity.getEvents().clear();
-      }
-    }
-
-    // fetch metrics if fieldsToRetrieve contains METRICS or ALL.
-    if (hasField(fieldsToRetrieve, Field.METRICS)) {
-      readMetrics(entity, result, EntityColumnPrefix.METRIC);
-    }
-
-    entity.getInfo().put(TimelineReaderUtils.FROMID_KEY,
-        parseRowKey.getRowKeyAsString());
-    return entity;
-  }
-
-  /**
-   * Helper method for reading key-value pairs for either info or config.
-   *
-   * @param <T> Describes the type of column prefix.
-   * @param entity entity to fill.
-   * @param result result from HBase.
-   * @param prefix column prefix.
-   * @param isConfig if true, means we are reading configs, otherwise info.
-   * @throws IOException if any problem is encountered while reading result.
-   */
-  protected <T> void readKeyValuePairs(TimelineEntity entity, Result result,
-      ColumnPrefix<T> prefix, boolean isConfig) throws IOException {
-    // info and configuration are of type Map<String, Object or String>
-    Map<String, Object> columns =
-        prefix.readResults(result, stringKeyConverter);
-    if (isConfig) {
-      for (Map.Entry<String, Object> column : columns.entrySet()) {
-        entity.addConfig(column.getKey(), column.getValue().toString());
-      }
-    } else {
-      entity.addInfo(columns);
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to