http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
deleted file mode 100644
index f6904c5..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
+++ /dev/null
@@ -1,628 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.FamilyFilter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.hbase.filter.QualifierFilter;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Timeline entity reader for generic entities that are stored in the entity
- * table.
- */
-class GenericEntityReader extends TimelineEntityReader {
-  private static final EntityTable ENTITY_TABLE = new EntityTable();
-
-  /**
-   * Used to convert strings key components to and from storage format.
-   */
-  private final KeyConverter<String> stringKeyConverter =
-      new StringKeyConverter();
-
-  public GenericEntityReader(TimelineReaderContext ctxt,
-      TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
-    super(ctxt, entityFilters, toRetrieve);
-  }
-
-  public GenericEntityReader(TimelineReaderContext ctxt,
-      TimelineDataToRetrieve toRetrieve) {
-    super(ctxt, toRetrieve);
-  }
-
-  /**
-   * Uses the {@link EntityTable}.
-   */
-  protected BaseTable<?> getTable() {
-    return ENTITY_TABLE;
-  }
-
-  @Override
-  protected FilterList constructFilterListBasedOnFilters() throws IOException {
-    // Filters here cannot be null for multiple entity reads as they are set in
-    // augmentParams if null.
-    FilterList listBasedOnFilters = new FilterList();
-    TimelineEntityFilters filters = getFilters();
-    // Create filter list based on created time range and add it to
-    // listBasedOnFilters.
-    long createdTimeBegin = filters.getCreatedTimeBegin();
-    long createdTimeEnd = filters.getCreatedTimeEnd();
-    if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) {
-      listBasedOnFilters.addFilter(TimelineFilterUtils
-          .createSingleColValueFiltersByRange(EntityColumn.CREATED_TIME,
-              createdTimeBegin, createdTimeEnd));
-    }
-    // Create filter list based on metric filters and add it to
-    // listBasedOnFilters.
-    TimelineFilterList metricFilters = filters.getMetricFilters();
-    if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) {
-      listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
-          EntityColumnPrefix.METRIC, metricFilters));
-    }
-    // Create filter list based on config filters and add it to
-    // listBasedOnFilters.
-    TimelineFilterList configFilters = filters.getConfigFilters();
-    if (configFilters != null && !configFilters.getFilterList().isEmpty()) {
-      listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
-          EntityColumnPrefix.CONFIG, configFilters));
-    }
-    // Create filter list based on info filters and add it to 
listBasedOnFilters
-    TimelineFilterList infoFilters = filters.getInfoFilters();
-    if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) {
-      listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList(
-          EntityColumnPrefix.INFO, infoFilters));
-    }
-    return listBasedOnFilters;
-  }
-
-  /**
-   * Check if we need to fetch only some of the event columns.
-   *
-   * @return true if we need to fetch some of the columns, false otherwise.
-   */
-  private boolean fetchPartialEventCols(TimelineFilterList eventFilters,
-      EnumSet<Field> fieldsToRetrieve) {
-    return (eventFilters != null && !eventFilters.getFilterList().isEmpty() &&
-        !hasField(fieldsToRetrieve, Field.EVENTS));
-  }
-
-  /**
-   * Check if we need to fetch only some of the relates_to columns.
-   *
-   * @return true if we need to fetch some of the columns, false otherwise.
-   */
-  private boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo,
-      EnumSet<Field> fieldsToRetrieve) {
-    return (relatesTo != null && !relatesTo.getFilterList().isEmpty() &&
-        !hasField(fieldsToRetrieve, Field.RELATES_TO));
-  }
-
-  /**
-   * Check if we need to fetch only some of the is_related_to columns.
-   *
-   * @return true if we need to fetch some of the columns, false otherwise.
-   */
-  private boolean fetchPartialIsRelatedToCols(TimelineFilterList isRelatedTo,
-      EnumSet<Field> fieldsToRetrieve) {
-    return (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty() &&
-        !hasField(fieldsToRetrieve, Field.IS_RELATED_TO));
-  }
-
-  /**
-   * Check if we need to fetch only some of the columns based on event filters,
-   * relatesto and isrelatedto from info family.
-   *
-   * @return true, if we need to fetch only some of the columns, false if we
-   *         need to fetch all the columns under info column family.
-   */
-  protected boolean fetchPartialColsFromInfoFamily() {
-    EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
-    TimelineEntityFilters filters = getFilters();
-    return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve)
-        || fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve)
-        || fetchPartialIsRelatedToCols(filters.getIsRelatedTo(),
-            fieldsToRetrieve);
-  }
-
-  /**
-   * Check if we need to create filter list based on fields. We need to create 
a
-   * filter list iff all fields need not be retrieved or we have some specific
-   * fields or metrics to retrieve. We also need to create a filter list if we
-   * have relationships(relatesTo/isRelatedTo) and event filters specified for
-   * the query.
-   *
-   * @return true if we need to create the filter list, false otherwise.
-   */
-  protected boolean needCreateFilterListBasedOnFields() {
-    TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
-    // Check if all fields are to be retrieved or not. If all fields have to
-    // be retrieved, also check if we have some metrics or configs to
-    // retrieve specified for the query because then a filter list will have
-    // to be created.
-    boolean flag =
-        !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL)
-            || (dataToRetrieve.getConfsToRetrieve() != null && !dataToRetrieve
-                .getConfsToRetrieve().getFilterList().isEmpty())
-            || (dataToRetrieve.getMetricsToRetrieve() != null && 
!dataToRetrieve
-                .getMetricsToRetrieve().getFilterList().isEmpty());
-    // Filters need to be checked only if we are reading multiple entities. If
-    // condition above is false, we check if there are relationships(relatesTo/
-    // isRelatedTo) and event filters specified for the query.
-    if (!flag && !isSingleEntityRead()) {
-      TimelineEntityFilters filters = getFilters();
-      flag =
-          (filters.getEventFilters() != null && !filters.getEventFilters()
-              .getFilterList().isEmpty())
-              || (filters.getIsRelatedTo() != null && !filters.getIsRelatedTo()
-                  .getFilterList().isEmpty())
-              || (filters.getRelatesTo() != null && !filters.getRelatesTo()
-                  .getFilterList().isEmpty());
-    }
-    return flag;
-  }
-
-  /**
-   * Add {@link QualifierFilter} filters to filter list for each column of
-   * entity table.
-   *
-   * @param list filter list to which qualifier filters have to be added.
-   */
-  protected void updateFixedColumns(FilterList list) {
-    for (EntityColumn column : EntityColumn.values()) {
-      list.addFilter(new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(
-          column.getColumnQualifierBytes())));
-    }
-  }
-
-  /**
-   * Creates a filter list which indicates that only some of the column
-   * qualifiers in the info column family will be returned in result.
-   *
-   * @param isApplication If true, it means operations are to be performed for
-   *          application table, otherwise for entity table.
-   * @return filter list.
-   * @throws IOException if any problem occurs while creating filter list.
-   */
-  private FilterList createFilterListForColsOfInfoFamily() throws IOException {
-    FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE);
-    // Add filters for each column in entity table.
-    updateFixedColumns(infoFamilyColsFilter);
-    EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
-    // If INFO field has to be retrieved, add a filter for fetching columns
-    // with INFO column prefix.
-    if (hasField(fieldsToRetrieve, Field.INFO)) {
-      infoFamilyColsFilter
-          .addFilter(TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.EQUAL, EntityColumnPrefix.INFO));
-    }
-    TimelineFilterList relatesTo = getFilters().getRelatesTo();
-    if (hasField(fieldsToRetrieve, Field.RELATES_TO)) {
-      // If RELATES_TO field has to be retrieved, add a filter for fetching
-      // columns with RELATES_TO column prefix.
-      infoFamilyColsFilter.addFilter(TimelineFilterUtils
-          .createHBaseQualifierFilter(CompareOp.EQUAL,
-              EntityColumnPrefix.RELATES_TO));
-    } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) {
-      // Even if fields to retrieve does not contain RELATES_TO, we still
-      // need to have a filter to fetch some of the column qualifiers if
-      // relatesTo filters are specified. relatesTo filters will then be
-      // matched after fetching rows from HBase.
-      Set<String> relatesToCols =
-          TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo);
-      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
-          EntityColumnPrefix.RELATES_TO, relatesToCols));
-    }
-    TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo();
-    if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
-      // If IS_RELATED_TO field has to be retrieved, add a filter for fetching
-      // columns with IS_RELATED_TO column prefix.
-      infoFamilyColsFilter.addFilter(TimelineFilterUtils
-          .createHBaseQualifierFilter(CompareOp.EQUAL,
-              EntityColumnPrefix.IS_RELATED_TO));
-    } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) {
-      // Even if fields to retrieve does not contain IS_RELATED_TO, we still
-      // need to have a filter to fetch some of the column qualifiers if
-      // isRelatedTo filters are specified. isRelatedTo filters will then be
-      // matched after fetching rows from HBase.
-      Set<String> isRelatedToCols =
-          TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo);
-      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
-          EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols));
-    }
-    TimelineFilterList eventFilters = getFilters().getEventFilters();
-    if (hasField(fieldsToRetrieve, Field.EVENTS)) {
-      // If EVENTS field has to be retrieved, add a filter for fetching columns
-      // with EVENT column prefix.
-      infoFamilyColsFilter
-          .addFilter(TimelineFilterUtils.createHBaseQualifierFilter(
-              CompareOp.EQUAL, EntityColumnPrefix.EVENT));
-    } else if (eventFilters != null &&
-        !eventFilters.getFilterList().isEmpty()) {
-      // Even if fields to retrieve does not contain EVENTS, we still need to
-      // have a filter to fetch some of the column qualifiers on the basis of
-      // event filters specified. Event filters will then be matched after
-      // fetching rows from HBase.
-      Set<String> eventCols =
-          TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters);
-      infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers(
-          EntityColumnPrefix.EVENT, eventCols));
-    }
-    return infoFamilyColsFilter;
-  }
-
-  /**
-   * Exclude column prefixes via filters which are not required(based on fields
-   * to retrieve) from info column family. These filters are added to filter
-   * list which contains a filter for getting info column family.
-   *
-   * @param infoColFamilyList filter list for info column family.
-   */
-  private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
-    EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
-    // Events not required.
-    if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
-      infoColFamilyList.addFilter(TimelineFilterUtils
-          .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
-              EntityColumnPrefix.EVENT));
-    }
-    // info not required.
-    if (!hasField(fieldsToRetrieve, Field.INFO)) {
-      infoColFamilyList.addFilter(TimelineFilterUtils
-          .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
-              EntityColumnPrefix.INFO));
-    }
-    // is related to not required.
-    if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
-      infoColFamilyList.addFilter(TimelineFilterUtils
-          .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
-              EntityColumnPrefix.IS_RELATED_TO));
-    }
-    // relates to not required.
-    if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
-      infoColFamilyList.addFilter(TimelineFilterUtils
-          .createHBaseQualifierFilter(CompareOp.NOT_EQUAL,
-              EntityColumnPrefix.RELATES_TO));
-    }
-  }
-
-  /**
-   * Updates filter list based on fields for confs and metrics to retrieve.
-   *
-   * @param listBasedOnFields filter list based on fields.
-   * @throws IOException if any problem occurs while updating filter list.
-   */
-  private void updateFilterForConfsAndMetricsToRetrieve(
-      FilterList listBasedOnFields) throws IOException {
-    TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
-    // Please note that if confsToRetrieve is specified, we would have added
-    // CONFS to fields to retrieve in augmentParams() even if not specified.
-    if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) {
-      // Create a filter list for configs.
-      listBasedOnFields.addFilter(TimelineFilterUtils
-          .createFilterForConfsOrMetricsToRetrieve(
-              dataToRetrieve.getConfsToRetrieve(), EntityColumnFamily.CONFIGS,
-              EntityColumnPrefix.CONFIG));
-    }
-
-    // Please note that if metricsToRetrieve is specified, we would have added
-    // METRICS to fields to retrieve in augmentParams() even if not specified.
-    if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) {
-      // Create a filter list for metrics.
-      listBasedOnFields.addFilter(TimelineFilterUtils
-          .createFilterForConfsOrMetricsToRetrieve(
-              dataToRetrieve.getMetricsToRetrieve(),
-              EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC));
-    }
-  }
-
-  @Override
-  protected FilterList constructFilterListBasedOnFields() throws IOException {
-    if (!needCreateFilterListBasedOnFields()) {
-      // Fetch all the columns. No need of a filter.
-      return null;
-    }
-    FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE);
-    FilterList infoColFamilyList = new FilterList();
-    // By default fetch everything in INFO column family.
-    FamilyFilter infoColumnFamily =
-        new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(
-            EntityColumnFamily.INFO.getBytes()));
-    infoColFamilyList.addFilter(infoColumnFamily);
-    if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) {
-      // We can fetch only some of the columns from info family.
-      infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily());
-    } else {
-      // Exclude column prefixes in info column family which are not required
-      // based on fields to retrieve.
-      excludeFieldsFromInfoColFamily(infoColFamilyList);
-    }
-    listBasedOnFields.addFilter(infoColFamilyList);
-    updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields);
-    return listBasedOnFields;
-  }
-
-  @Override
-  protected void validateParams() {
-    Preconditions.checkNotNull(getContext(), "context shouldn't be null");
-    Preconditions.checkNotNull(getDataToRetrieve(),
-        "data to retrieve shouldn't be null");
-    Preconditions.checkNotNull(getContext().getClusterId(),
-        "clusterId shouldn't be null");
-    Preconditions.checkNotNull(getContext().getAppId(),
-        "appId shouldn't be null");
-    Preconditions.checkNotNull(getContext().getEntityType(),
-        "entityType shouldn't be null");
-    if (isSingleEntityRead()) {
-      Preconditions.checkNotNull(getContext().getEntityId(),
-          "entityId shouldn't be null");
-    }
-  }
-
-  @Override
-  protected void augmentParams(Configuration hbaseConf, Connection conn)
-      throws IOException {
-    defaultAugmentParams(hbaseConf, conn);
-    // Add configs/metrics to fields to retrieve if confsToRetrieve and/or
-    // metricsToRetrieve are specified.
-    getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve();
-    if (!isSingleEntityRead()) {
-      createFiltersIfNull();
-    }
-  }
-
-  @Override
-  protected Result getResult(Configuration hbaseConf, Connection conn,
-      FilterList filterList) throws IOException {
-    TimelineReaderContext context = getContext();
-    Result result = null;
-    if (context.getEntityIdPrefix() != null) {
-      byte[] rowKey = new EntityRowKey(context.getClusterId(),
-          context.getUserId(), context.getFlowName(), context.getFlowRunId(),
-          context.getAppId(), context.getEntityType(),
-          context.getEntityIdPrefix(), context.getEntityId()).getRowKey();
-
-      Get get = new Get(rowKey);
-      get.setMaxVersions(getDataToRetrieve().getMetricsLimit());
-      if (filterList != null && !filterList.getFilters().isEmpty()) {
-        get.setFilter(filterList);
-      }
-      result = getTable().getResult(hbaseConf, conn, get);
-
-    } else {
-      // Prepare for range scan
-      // create single SingleColumnValueFilter and add to existing filters.
-      FilterList filter = new FilterList(Operator.MUST_PASS_ALL);
-      if (filterList != null && !filterList.getFilters().isEmpty()) {
-        filter.addFilter(filterList);
-      }
-      FilterList newFilter = new FilterList();
-      newFilter.addFilter(TimelineFilterUtils.createHBaseSingleColValueFilter(
-          EntityColumn.ID, context.getEntityId(), CompareOp.EQUAL));
-      newFilter.addFilter(new PageFilter(1));
-      filter.addFilter(newFilter);
-
-      ResultScanner results = getResults(hbaseConf, conn, filter);
-      try {
-        Iterator<Result> iterator = results.iterator();
-        if (iterator.hasNext()) {
-          result = iterator.next();
-        }
-      } finally {
-        results.close();
-      }
-    }
-    return result;
-  }
-
-  @Override
-  protected ResultScanner getResults(Configuration hbaseConf, Connection conn,
-      FilterList filterList) throws IOException {
-    // Scan through part of the table to find the entities belong to one app
-    // and one type
-    Scan scan = new Scan();
-    TimelineReaderContext context = getContext();
-    RowKeyPrefix<EntityRowKey> entityRowKeyPrefix = null;
-    // default mode, will always scans from beginning of entity type.
-    if (getFilters() == null || getFilters().getFromIdPrefix() == null) {
-      entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(),
-          context.getUserId(), context.getFlowName(), context.getFlowRunId(),
-          context.getAppId(), context.getEntityType(), null, null);
-      scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix());
-    } else { // pagination mode, will scan from given entityIdPrefix!enitityId
-      entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(),
-          context.getUserId(), context.getFlowName(), context.getFlowRunId(),
-          context.getAppId(), context.getEntityType(),
-          getFilters().getFromIdPrefix(), getFilters().getFromId());
-
-      // set start row
-      scan.setStartRow(entityRowKeyPrefix.getRowKeyPrefix());
-
-      // get the bytes for stop row
-      entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(),
-          context.getUserId(), context.getFlowName(), context.getFlowRunId(),
-          context.getAppId(), context.getEntityType(), null, null);
-
-      // set stop row
-      scan.setStopRow(
-          HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix(
-              entityRowKeyPrefix.getRowKeyPrefix()));
-
-      // set page filter to limit. This filter has to set only in pagination
-      // mode.
-      filterList.addFilter(new PageFilter(getFilters().getLimit()));
-    }
-    scan.setMaxVersions(getDataToRetrieve().getMetricsLimit());
-    if (filterList != null && !filterList.getFilters().isEmpty()) {
-      scan.setFilter(filterList);
-    }
-    return getTable().getResultScanner(hbaseConf, conn, scan);
-  }
-
-  @Override
-  protected TimelineEntity parseEntity(Result result) throws IOException {
-    if (result == null || result.isEmpty()) {
-      return null;
-    }
-    TimelineEntity entity = new TimelineEntity();
-    EntityRowKey parseRowKey = EntityRowKey.parseRowKey(result.getRow());
-    entity.setType(parseRowKey.getEntityType());
-    entity.setId(parseRowKey.getEntityId());
-    entity.setIdPrefix(parseRowKey.getEntityIdPrefix().longValue());
-
-    TimelineEntityFilters filters = getFilters();
-    // fetch created time
-    Long createdTime = (Long) EntityColumn.CREATED_TIME.readResult(result);
-    entity.setCreatedTime(createdTime);
-
-    EnumSet<Field> fieldsToRetrieve = 
getDataToRetrieve().getFieldsToRetrieve();
-    // fetch is related to entities and match isRelatedTo filter. If 
isRelatedTo
-    // filters do not match, entity would be dropped. We have to match filters
-    // locally as relevant HBase filters to filter out rows on the basis of
-    // isRelatedTo are not set in HBase scan.
-    boolean checkIsRelatedTo =
-        !isSingleEntityRead() && filters.getIsRelatedTo() != null
-            && filters.getIsRelatedTo().getFilterList().size() > 0;
-    if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) {
-      readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
-      if (checkIsRelatedTo
-          && !TimelineStorageUtils.matchIsRelatedTo(entity,
-              filters.getIsRelatedTo())) {
-        return null;
-      }
-      if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) {
-        entity.getIsRelatedToEntities().clear();
-      }
-    }
-
-    // fetch relates to entities and match relatesTo filter. If relatesTo
-    // filters do not match, entity would be dropped. We have to match filters
-    // locally as relevant HBase filters to filter out rows on the basis of
-    // relatesTo are not set in HBase scan.
-    boolean checkRelatesTo =
-        !isSingleEntityRead() && filters.getRelatesTo() != null
-            && filters.getRelatesTo().getFilterList().size() > 0;
-    if (hasField(fieldsToRetrieve, Field.RELATES_TO)
-        || checkRelatesTo) {
-      readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
-      if (checkRelatesTo
-          && !TimelineStorageUtils.matchRelatesTo(entity,
-              filters.getRelatesTo())) {
-        return null;
-      }
-      if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) {
-        entity.getRelatesToEntities().clear();
-      }
-    }
-
-    // fetch info if fieldsToRetrieve contains INFO or ALL.
-    if (hasField(fieldsToRetrieve, Field.INFO)) {
-      readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
-    }
-
-    // fetch configs if fieldsToRetrieve contains CONFIGS or ALL.
-    if (hasField(fieldsToRetrieve, Field.CONFIGS)) {
-      readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
-    }
-
-    // fetch events and match event filters if they exist. If event filters do
-    // not match, entity would be dropped. We have to match filters locally
-    // as relevant HBase filters to filter out rows on the basis of events
-    // are not set in HBase scan.
-    boolean checkEvents =
-        !isSingleEntityRead() && filters.getEventFilters() != null
-            && filters.getEventFilters().getFilterList().size() > 0;
-    if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) {
-      readEvents(entity, result, EntityColumnPrefix.EVENT);
-      if (checkEvents
-          && !TimelineStorageUtils.matchEventFilters(entity,
-              filters.getEventFilters())) {
-        return null;
-      }
-      if (!hasField(fieldsToRetrieve, Field.EVENTS)) {
-        entity.getEvents().clear();
-      }
-    }
-
-    // fetch metrics if fieldsToRetrieve contains METRICS or ALL.
-    if (hasField(fieldsToRetrieve, Field.METRICS)) {
-      readMetrics(entity, result, EntityColumnPrefix.METRIC);
-    }
-    return entity;
-  }
-
-  /**
-   * Helper method for reading key-value pairs for either info or config.
-   *
-   * @param <T> Describes the type of column prefix.
-   * @param entity entity to fill.
-   * @param result result from HBase.
-   * @param prefix column prefix.
-   * @param isConfig if true, means we are reading configs, otherwise info.
-   * @throws IOException if any problem is encountered while reading result.
-   */
-  protected <T> void readKeyValuePairs(TimelineEntity entity, Result result,
-      ColumnPrefix<T> prefix, boolean isConfig) throws IOException {
-    // info and configuration are of type Map<String, Object or String>
-    Map<String, Object> columns =
-        prefix.readResults(result, stringKeyConverter);
-    if (isConfig) {
-      for (Map.Entry<String, Object> column : columns.entrySet()) {
-        entity.addConfig(column.getKey(), column.getValue().toString());
-      }
-    } else {
-      entity.addInfo(columns);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
deleted file mode 100644
index 4c88cd3..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.hbase.filter.QualifierFilter;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
-
-/**
- * The base class for reading and deserializing timeline entities from the
- * HBase storage. Different types can be defined for different types of the
- * entities that are being requested.
- */
-public abstract class TimelineEntityReader extends
-    AbstractTimelineStorageReader {
-  private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class);
-
-  private final boolean singleEntityRead;
-  private TimelineDataToRetrieve dataToRetrieve;
-  // used only for multiple entity read mode
-  private TimelineEntityFilters filters;
-
-  /**
-   * Main table the entity reader uses.
-   */
-  private BaseTable<?> table;
-
-  /**
-   * Used to convert strings key components to and from storage format.
-   */
-  private final KeyConverter<String> stringKeyConverter =
-      new StringKeyConverter();
-
-  /**
-   * Instantiates a reader for multiple-entity reads.
-   *
-   * @param ctxt Reader context which defines the scope in which query has to 
be
-   *     made.
-   * @param entityFilters Filters which limit the entities returned.
-   * @param toRetrieve Data to retrieve for each entity.
-   */
-  protected TimelineEntityReader(TimelineReaderContext ctxt,
-      TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) {
-    super(ctxt);
-    this.singleEntityRead = false;
-    this.dataToRetrieve = toRetrieve;
-    this.filters = entityFilters;
-
-    this.setTable(getTable());
-  }
-
-  /**
-   * Instantiates a reader for single-entity reads.
-   *
-   * @param ctxt Reader context which defines the scope in which query has to 
be
-   *     made.
-   * @param toRetrieve Data to retrieve for each entity.
-   */
-  protected TimelineEntityReader(TimelineReaderContext ctxt,
-      TimelineDataToRetrieve toRetrieve) {
-    super(ctxt);
-    this.singleEntityRead = true;
-    this.dataToRetrieve = toRetrieve;
-
-    this.setTable(getTable());
-  }
-
-  /**
-   * Creates a {@link FilterList} based on fields, confs and metrics to
-   * retrieve. This filter list will be set in Scan/Get objects to trim down
-   * results fetched from HBase back-end storage. This is called only for
-   * multiple entity reads.
-   *
-   * @return a {@link FilterList} object.
-   * @throws IOException if any problem occurs while creating filter list.
-   */
-  protected abstract FilterList constructFilterListBasedOnFields()
-      throws IOException;
-
-  /**
-   * Creates a {@link FilterList} based on info, config and metric filters. 
This
-   * filter list will be set in HBase Get to trim down results fetched from
-   * HBase back-end storage.
-   *
-   * @return a {@link FilterList} object.
-   * @throws IOException if any problem occurs while creating filter list.
-   */
-  protected abstract FilterList constructFilterListBasedOnFilters()
-      throws IOException;
-
-  /**
-   * Combines filter lists created based on fields and based on filters.
-   *
-   * @return a {@link FilterList} object if it can be constructed. Returns 
null,
-   * if filter list cannot be created either on the basis of filters or on the
-   * basis of fields.
-   * @throws IOException if any problem occurs while creating filter list.
-   */
-  private FilterList createFilterList() throws IOException {
-    FilterList listBasedOnFilters = constructFilterListBasedOnFilters();
-    boolean hasListBasedOnFilters = listBasedOnFilters != null &&
-        !listBasedOnFilters.getFilters().isEmpty();
-    FilterList listBasedOnFields = constructFilterListBasedOnFields();
-    boolean hasListBasedOnFields = listBasedOnFields != null &&
-        !listBasedOnFields.getFilters().isEmpty();
-    // If filter lists based on both filters and fields can be created,
-    // combine them in a new filter list and return it.
-    // If either one of them has been created, return that filter list.
-    // Return null, if none of the filter lists can be created. This indicates
-    // that no filter list needs to be added to HBase Scan as filters are not
-    // specified for the query or only the default view of entity needs to be
-    // returned.
-    if (hasListBasedOnFilters && hasListBasedOnFields) {
-      FilterList list = new FilterList();
-      list.addFilter(listBasedOnFilters);
-      list.addFilter(listBasedOnFields);
-      return list;
-    } else if (hasListBasedOnFilters) {
-      return listBasedOnFilters;
-    } else if (hasListBasedOnFields) {
-      return listBasedOnFields;
-    }
-    return null;
-  }
-
-  protected TimelineDataToRetrieve getDataToRetrieve() {
-    return dataToRetrieve;
-  }
-
-  protected TimelineEntityFilters getFilters() {
-    return filters;
-  }
-
-  /**
-   * Create a {@link TimelineEntityFilters} object with default values for
-   * filters.
-   */
-  protected void createFiltersIfNull() {
-    if (filters == null) {
-      filters = new TimelineEntityFilters();
-    }
-  }
-
-  /**
-   * Reads and deserializes a single timeline entity from the HBase storage.
-   *
-   * @param hbaseConf HBase Configuration.
-   * @param conn HBase Connection.
-   * @return A <cite>TimelineEntity</cite> object.
-   * @throws IOException if there is any exception encountered while reading
-   *     entity.
-   */
-  public TimelineEntity readEntity(Configuration hbaseConf, Connection conn)
-      throws IOException {
-    validateParams();
-    augmentParams(hbaseConf, conn);
-
-    FilterList filterList = constructFilterListBasedOnFields();
-    if (LOG.isDebugEnabled() && filterList != null) {
-      LOG.debug("FilterList created for get is - " + filterList);
-    }
-    Result result = getResult(hbaseConf, conn, filterList);
-    if (result == null || result.isEmpty()) {
-      // Could not find a matching row.
-      LOG.info("Cannot find matching entity of type " +
-          getContext().getEntityType());
-      return null;
-    }
-    return parseEntity(result);
-  }
-
-  /**
-   * Reads and deserializes a set of timeline entities from the HBase storage.
-   * It goes through all the results available, and returns the number of
-   * entries as specified in the limit in the entity's natural sort order.
-   *
-   * @param hbaseConf HBase Configuration.
-   * @param conn HBase Connection.
-   * @return a set of <cite>TimelineEntity</cite> objects.
-   * @throws IOException if any exception is encountered while reading 
entities.
-   */
-  public Set<TimelineEntity> readEntities(Configuration hbaseConf,
-      Connection conn) throws IOException {
-    validateParams();
-    augmentParams(hbaseConf, conn);
-
-    Set<TimelineEntity> entities = new LinkedHashSet<>();
-    FilterList filterList = createFilterList();
-    if (LOG.isDebugEnabled() && filterList != null) {
-      LOG.debug("FilterList created for scan is - " + filterList);
-    }
-    ResultScanner results = getResults(hbaseConf, conn, filterList);
-    try {
-      for (Result result : results) {
-        TimelineEntity entity = parseEntity(result);
-        if (entity == null) {
-          continue;
-        }
-        entities.add(entity);
-        if (entities.size() == filters.getLimit()) {
-          break;
-        }
-      }
-      return entities;
-    } finally {
-      results.close();
-    }
-  }
-
-  /**
-   * Returns the main table to be used by the entity reader.
-   *
-   * @return A reference to the table.
-   */
-  protected BaseTable<?> getTable() {
-    return table;
-  }
-
-  /**
-   * Fetches a {@link Result} instance for a single-entity read.
-   *
-   * @param hbaseConf HBase Configuration.
-   * @param conn HBase Connection.
-   * @param filterList filter list which will be applied to HBase Get.
-   * @return the {@link Result} instance or null if no such record is found.
-   * @throws IOException if any exception is encountered while getting result.
-   */
-  protected abstract Result getResult(Configuration hbaseConf, Connection conn,
-      FilterList filterList) throws IOException;
-
-  /**
-   * Fetches a {@link ResultScanner} for a multi-entity read.
-   *
-   * @param hbaseConf HBase Configuration.
-   * @param conn HBase Connection.
-   * @param filterList filter list which will be applied to HBase Scan.
-   * @return the {@link ResultScanner} instance.
-   * @throws IOException if any exception is encountered while getting results.
-   */
-  protected abstract ResultScanner getResults(Configuration hbaseConf,
-      Connection conn, FilterList filterList) throws IOException;
-
-  /**
-   * Parses the result retrieved from HBase backend and convert it into a
-   * {@link TimelineEntity} object.
-   *
-   * @param result Single row result of a Get/Scan.
-   * @return the <cite>TimelineEntity</cite> instance or null if the entity is
-   *     filtered.
-   * @throws IOException if any exception is encountered while parsing entity.
-   */
-  protected abstract TimelineEntity parseEntity(Result result)
-      throws IOException;
-
-  /**
-   * Helper method for reading and deserializing {@link TimelineMetric} objects
-   * using the specified column prefix. The timeline metrics then are added to
-   * the given timeline entity.
-   *
-   * @param entity {@link TimelineEntity} object.
-   * @param result {@link Result} object retrieved from backend.
-   * @param columnPrefix Metric column prefix
-   * @throws IOException if any exception is encountered while reading metrics.
-   */
-  protected void readMetrics(TimelineEntity entity, Result result,
-      ColumnPrefix<?> columnPrefix) throws IOException {
-    NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
-        columnPrefix.readResultsWithTimestamps(
-            result, stringKeyConverter);
-    for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
-        metricsResult.entrySet()) {
-      TimelineMetric metric = new TimelineMetric();
-      metric.setId(metricResult.getKey());
-      // Simply assume that if the value set contains more than 1 elements, the
-      // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric
-      TimelineMetric.Type metricType = metricResult.getValue().size() > 1 ?
-          TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE;
-      metric.setType(metricType);
-      metric.addValues(metricResult.getValue());
-      entity.addMetric(metric);
-    }
-  }
-
-  /**
-   * Checks whether the reader has been created to fetch single entity or
-   * multiple entities.
-   *
-   * @return true, if query is for single entity, false otherwise.
-   */
-  public boolean isSingleEntityRead() {
-    return singleEntityRead;
-  }
-
-  protected void setTable(BaseTable<?> baseTable) {
-    this.table = baseTable;
-  }
-
-  /**
-   * Check if we have a certain field amongst fields to retrieve. This method
-   * checks against {@link Field#ALL} as well because that would mean field
-   * passed needs to be matched.
-   *
-   * @param fieldsToRetrieve fields to be retrieved.
-   * @param requiredField fields to be checked in fieldsToRetrieve.
-   * @return true if has the required field, false otherwise.
-   */
-  protected boolean hasField(EnumSet<Field> fieldsToRetrieve,
-      Field requiredField) {
-    return fieldsToRetrieve.contains(Field.ALL) ||
-        fieldsToRetrieve.contains(requiredField);
-  }
-
-  /**
-   * Create a filter list of qualifier filters based on passed set of columns.
-   *
-   * @param <T> Describes the type of column prefix.
-   * @param colPrefix Column Prefix.
-   * @param columns set of column qualifiers.
-   * @return filter list.
-   */
-  protected <T> FilterList createFiltersFromColumnQualifiers(
-      ColumnPrefix<T> colPrefix, Set<String> columns) {
-    FilterList list = new FilterList(Operator.MUST_PASS_ONE);
-    for (String column : columns) {
-      // For columns which have compound column qualifiers (eg. events), we 
need
-      // to include the required separator.
-      byte[] compoundColQual = createColQualifierPrefix(colPrefix, column);
-      list.addFilter(new QualifierFilter(CompareOp.EQUAL,
-          new BinaryPrefixComparator(colPrefix
-              .getColumnPrefixBytes(compoundColQual))));
-    }
-    return list;
-  }
-
-  protected <T> byte[] createColQualifierPrefix(ColumnPrefix<T> colPrefix,
-      String column) {
-    if (colPrefix == ApplicationColumnPrefix.EVENT
-        || colPrefix == EntityColumnPrefix.EVENT) {
-      return new EventColumnName(column, null, null).getColumnQualifier();
-    } else {
-      return stringKeyConverter.encode(column);
-    }
-  }
-
-  /**
-   * Helper method for reading relationship.
-   *
-   * @param <T> Describes the type of column prefix.
-   * @param entity entity to fill.
-   * @param result result from HBase.
-   * @param prefix column prefix.
-   * @param isRelatedTo if true, means relationship is to be added to
-   *          isRelatedTo, otherwise its added to relatesTo.
-   * @throws IOException if any problem is encountered while reading result.
-   */
-  protected <T> void readRelationship(TimelineEntity entity, Result result,
-      ColumnPrefix<T> prefix, boolean isRelatedTo) throws IOException {
-    // isRelatedTo and relatesTo are of type Map<String, Set<String>>
-    Map<String, Object> columns =
-        prefix.readResults(result, stringKeyConverter);
-    for (Map.Entry<String, Object> column : columns.entrySet()) {
-      for (String id : Separator.VALUES.splitEncoded(column.getValue()
-          .toString())) {
-        if (isRelatedTo) {
-          entity.addIsRelatedToEntity(column.getKey(), id);
-        } else {
-          entity.addRelatesToEntity(column.getKey(), id);
-        }
-      }
-    }
-  }
-
-  /**
-   * Read events from the entity table or the application table. The column 
name
-   * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
-   * if there is no info associated with the event.
-   *
-   * @param <T> Describes the type of column prefix.
-   * @param entity entity to fill.
-   * @param result HBase Result.
-   * @param prefix column prefix.
-   * @throws IOException if any problem is encountered while reading result.
-   */
-  protected static <T> void readEvents(TimelineEntity entity, Result result,
-      ColumnPrefix<T> prefix) throws IOException {
-    Map<String, TimelineEvent> eventsMap = new HashMap<>();
-    Map<EventColumnName, Object> eventsResult =
-        prefix.readResults(result, new EventColumnNameConverter());
-    for (Map.Entry<EventColumnName, Object>
-             eventResult : eventsResult.entrySet()) {
-      EventColumnName eventColumnName = eventResult.getKey();
-      String key = eventColumnName.getId() +
-          Long.toString(eventColumnName.getTimestamp());
-      // Retrieve previously seen event to add to it
-      TimelineEvent event = eventsMap.get(key);
-      if (event == null) {
-        // First time we're seeing this event, add it to the eventsMap
-        event = new TimelineEvent();
-        event.setId(eventColumnName.getId());
-        event.setTimestamp(eventColumnName.getTimestamp());
-        eventsMap.put(key, event);
-      }
-      if (eventColumnName.getInfoKey() != null) {
-        event.addInfo(eventColumnName.getInfoKey(), eventResult.getValue());
-      }
-    }
-    Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
-    entity.addEvents(eventsSet);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java
deleted file mode 100644
index 16fffa4..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
-
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
-import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
-
-/**
- * Factory methods for instantiating a timeline entity reader.
- */
-public final class TimelineEntityReaderFactory {
-  private TimelineEntityReaderFactory() {
-  }
-
-  /**
-   * Creates a timeline entity reader instance for reading a single entity with
-   * the specified input.
-   *
-   * @param context Reader context which defines the scope in which query has 
to
-   *     be made.
-   * @param dataToRetrieve Data to retrieve for each entity.
-   * @return An implementation of <cite>TimelineEntityReader</cite> object
-   *     depending on entity type.
-   */
-  public static TimelineEntityReader createSingleEntityReader(
-      TimelineReaderContext context, TimelineDataToRetrieve dataToRetrieve) {
-    // currently the types that are handled separate from the generic entity
-    // table are application, flow run, and flow activity entities
-    if (TimelineEntityType.YARN_APPLICATION.matches(context.getEntityType())) {
-      return new ApplicationEntityReader(context, dataToRetrieve);
-    } else if (TimelineEntityType.
-        YARN_FLOW_RUN.matches(context.getEntityType())) {
-      return new FlowRunEntityReader(context, dataToRetrieve);
-    } else if (TimelineEntityType.
-        YARN_FLOW_ACTIVITY.matches(context.getEntityType())) {
-      return new FlowActivityEntityReader(context, dataToRetrieve);
-    } else {
-      // assume we're dealing with a generic entity read
-      return new GenericEntityReader(context, dataToRetrieve);
-    }
-  }
-
-  /**
-   * Creates a timeline entity reader instance for reading set of entities with
-   * the specified input and predicates.
-   *
-   * @param context Reader context which defines the scope in which query has 
to
-   *     be made.
-   * @param filters Filters which limit the entities returned.
-   * @param dataToRetrieve Data to retrieve for each entity.
-   * @return An implementation of <cite>TimelineEntityReader</cite> object
-   *     depending on entity type.
-   */
-  public static TimelineEntityReader createMultipleEntitiesReader(
-      TimelineReaderContext context, TimelineEntityFilters filters,
-      TimelineDataToRetrieve dataToRetrieve) {
-    // currently the types that are handled separate from the generic entity
-    // table are application, flow run, and flow activity entities
-    if (TimelineEntityType.YARN_APPLICATION.matches(context.getEntityType())) {
-      return new ApplicationEntityReader(context, filters, dataToRetrieve);
-    } else if (TimelineEntityType.
-        YARN_FLOW_ACTIVITY.matches(context.getEntityType())) {
-      return new FlowActivityEntityReader(context, filters, dataToRetrieve);
-    } else if (TimelineEntityType.
-        YARN_FLOW_RUN.matches(context.getEntityType())) {
-      return new FlowRunEntityReader(context, filters, dataToRetrieve);
-    } else {
-      // assume we're dealing with a generic entity read
-      return new GenericEntityReader(context, filters, dataToRetrieve);
-    }
-  }
-
-  /**
-   * Creates a timeline entity type reader that will read all available entity
-   * types within the specified context.
-   *
-   * @param context Reader context which defines the scope in which query has 
to
-   *                be made. Limited to application level only.
-   * @return an <cite>EntityTypeReader</cite> object
-   */
-  public static EntityTypeReader createEntityTypeReader(
-      TimelineReaderContext context) {
-    return new EntityTypeReader(context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java
deleted file mode 100644
index 9814d6d..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Package org.apache.hadoop.yarn.server.timelineservice.storage.reader
- * contains classes used to read entities from backend based on query type.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
deleted file mode 100644
index 58df970..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.common;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.junit.Test;
-
-public class TestKeyConverters {
-
-  @Test
-  public void testAppIdKeyConverter() {
-    AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter();
-    long currentTs = System.currentTimeMillis();
-    ApplicationId appId1 = ApplicationId.newInstance(currentTs, 1);
-    ApplicationId appId2 = ApplicationId.newInstance(currentTs, 2);
-    ApplicationId appId3 = ApplicationId.newInstance(currentTs + 300, 1);
-    String appIdStr1 = appId1.toString();
-    String appIdStr2 = appId2.toString();
-    String appIdStr3 = appId3.toString();
-    byte[] appIdBytes1 = appIdKeyConverter.encode(appIdStr1);
-    byte[] appIdBytes2 = appIdKeyConverter.encode(appIdStr2);
-    byte[] appIdBytes3 = appIdKeyConverter.encode(appIdStr3);
-    // App ids' should be encoded in a manner wherein descending order
-    // is maintained.
-    assertTrue(
-        "Ordering of app ids' is incorrect",
-        Bytes.compareTo(appIdBytes1, appIdBytes2) > 0
-            && Bytes.compareTo(appIdBytes1, appIdBytes3) > 0
-            && Bytes.compareTo(appIdBytes2, appIdBytes3) > 0);
-    String decodedAppId1 = appIdKeyConverter.decode(appIdBytes1);
-    String decodedAppId2 = appIdKeyConverter.decode(appIdBytes2);
-    String decodedAppId3 = appIdKeyConverter.decode(appIdBytes3);
-    assertTrue("Decoded app id is not same as the app id encoded",
-        appIdStr1.equals(decodedAppId1));
-    assertTrue("Decoded app id is not same as the app id encoded",
-        appIdStr2.equals(decodedAppId2));
-    assertTrue("Decoded app id is not same as the app id encoded",
-        appIdStr3.equals(decodedAppId3));
-  }
-
-  @Test
-  public void testEventColumnNameConverter() {
-    String eventId = "=foo_=eve=nt=";
-    byte[] valSepBytes = Bytes.toBytes(Separator.VALUES.getValue());
-    byte[] maxByteArr =
-        Bytes.createMaxByteArray(Bytes.SIZEOF_LONG - valSepBytes.length);
-    byte[] ts = Bytes.add(valSepBytes, maxByteArr);
-    Long eventTs = Bytes.toLong(ts);
-    byte[] byteEventColName =
-        new EventColumnName(eventId, eventTs, null).getColumnQualifier();
-    KeyConverter<EventColumnName> eventColumnNameConverter =
-        new EventColumnNameConverter();
-    EventColumnName eventColName =
-        eventColumnNameConverter.decode(byteEventColName);
-    assertEquals(eventId, eventColName.getId());
-    assertEquals(eventTs, eventColName.getTimestamp());
-    assertNull(eventColName.getInfoKey());
-
-    String infoKey = "f=oo_event_in=fo=_key";
-    byteEventColName =
-        new EventColumnName(eventId, eventTs, infoKey).getColumnQualifier();
-    eventColName = eventColumnNameConverter.decode(byteEventColName);
-    assertEquals(eventId, eventColName.getId());
-    assertEquals(eventTs, eventColName.getTimestamp());
-    assertEquals(infoKey, eventColName.getInfoKey());
-  }
-
-  @Test
-  public void testLongKeyConverter() {
-    LongKeyConverter longKeyConverter = new LongKeyConverter();
-    confirmLongKeyConverter(longKeyConverter, Long.MIN_VALUE);
-    confirmLongKeyConverter(longKeyConverter, -1234567890L);
-    confirmLongKeyConverter(longKeyConverter, -128L);
-    confirmLongKeyConverter(longKeyConverter, -127L);
-    confirmLongKeyConverter(longKeyConverter, -1L);
-    confirmLongKeyConverter(longKeyConverter, 0L);
-    confirmLongKeyConverter(longKeyConverter, 1L);
-    confirmLongKeyConverter(longKeyConverter, 127L);
-    confirmLongKeyConverter(longKeyConverter, 128L);
-    confirmLongKeyConverter(longKeyConverter, 1234567890L);
-    confirmLongKeyConverter(longKeyConverter, Long.MAX_VALUE);
-  }
-
-  private void confirmLongKeyConverter(LongKeyConverter longKeyConverter,
-      Long testValue) {
-    Long decoded = longKeyConverter.decode(longKeyConverter.encode(testValue));
-    assertEquals(testValue, decoded);
-  }
-
-  @Test
-  public void testStringKeyConverter() {
-    StringKeyConverter stringKeyConverter = new StringKeyConverter();
-    String phrase = "QuackAttack now!";
-
-    for (int i = 0; i < phrase.length(); i++) {
-      String sub = phrase.substring(i, phrase.length());
-      confirmStrignKeyConverter(stringKeyConverter, sub);
-      confirmStrignKeyConverter(stringKeyConverter, sub + sub);
-    }
-  }
-
-  private void confirmStrignKeyConverter(StringKeyConverter stringKeyConverter,
-      String testValue) {
-    String decoded =
-        stringKeyConverter.decode(stringKeyConverter.encode(testValue));
-    assertEquals(testValue, decoded);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
deleted file mode 100644
index cbd2273..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.common;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
-import org.junit.Test;
-
-
-public class TestRowKeys {
-
-  private final static String QUALIFIER_SEP = Separator.QUALIFIERS.getValue();
-  private final static byte[] QUALIFIER_SEP_BYTES = Bytes
-      .toBytes(QUALIFIER_SEP);
-  private final static String CLUSTER = "cl" + QUALIFIER_SEP + "uster";
-  private final static String USER = QUALIFIER_SEP + "user";
-  private final static String FLOW_NAME = "dummy_" + QUALIFIER_SEP + "flow"
-      + QUALIFIER_SEP;
-  private final static Long FLOW_RUN_ID;
-  private final static String APPLICATION_ID;
-  static {
-    long runid = Long.MAX_VALUE - 900L;
-    byte[] longMaxByteArr = Bytes.toBytes(Long.MAX_VALUE);
-    byte[] byteArr = Bytes.toBytes(runid);
-    int sepByteLen = QUALIFIER_SEP_BYTES.length;
-    if (sepByteLen <= byteArr.length) {
-      for (int i = 0; i < sepByteLen; i++) {
-        byteArr[i] = (byte) (longMaxByteArr[i] - QUALIFIER_SEP_BYTES[i]);
-      }
-    }
-    FLOW_RUN_ID = Bytes.toLong(byteArr);
-    long clusterTs = System.currentTimeMillis();
-    byteArr = Bytes.toBytes(clusterTs);
-    if (sepByteLen <= byteArr.length) {
-      for (int i = 0; i < sepByteLen; i++) {
-        byteArr[byteArr.length - sepByteLen + i] =
-            (byte) (longMaxByteArr[byteArr.length - sepByteLen + i] -
-                QUALIFIER_SEP_BYTES[i]);
-      }
-    }
-    clusterTs = Bytes.toLong(byteArr);
-    int seqId = 222;
-    APPLICATION_ID = ApplicationId.newInstance(clusterTs, seqId).toString();
-  }
-
-  private static void verifyRowPrefixBytes(byte[] byteRowKeyPrefix) {
-    int sepLen = QUALIFIER_SEP_BYTES.length;
-    for (int i = 0; i < sepLen; i++) {
-      assertTrue(
-          "Row key prefix not encoded properly.",
-          byteRowKeyPrefix[byteRowKeyPrefix.length - sepLen + i] ==
-              QUALIFIER_SEP_BYTES[i]);
-    }
-  }
-
-  @Test
-  public void testApplicationRowKey() {
-    byte[] byteRowKey =
-        new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
-            APPLICATION_ID).getRowKey();
-    ApplicationRowKey rowKey = ApplicationRowKey.parseRowKey(byteRowKey);
-    assertEquals(CLUSTER, rowKey.getClusterId());
-    assertEquals(USER, rowKey.getUserId());
-    assertEquals(FLOW_NAME, rowKey.getFlowName());
-    assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
-    assertEquals(APPLICATION_ID, rowKey.getAppId());
-
-    byte[] byteRowKeyPrefix =
-        new ApplicationRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID)
-            .getRowKeyPrefix();
-    byte[][] splits =
-        Separator.QUALIFIERS.split(byteRowKeyPrefix,
-            new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
-                Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
-                Separator.VARIABLE_SIZE});
-    assertEquals(5, splits.length);
-    assertEquals(0, splits[4].length);
-    assertEquals(FLOW_NAME,
-        Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
-    assertEquals(FLOW_RUN_ID,
-        (Long) LongConverter.invertLong(Bytes.toLong(splits[3])));
-    verifyRowPrefixBytes(byteRowKeyPrefix);
-
-    byteRowKeyPrefix =
-        new ApplicationRowKeyPrefix(CLUSTER, USER, 
FLOW_NAME).getRowKeyPrefix();
-    splits =
-        Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
-            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
-            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
-    assertEquals(4, splits.length);
-    assertEquals(0, splits[3].length);
-    assertEquals(FLOW_NAME,
-        Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
-    verifyRowPrefixBytes(byteRowKeyPrefix);
-  }
-
-  /**
-   * Tests the converters indirectly through the public methods of the
-   * corresponding rowkey.
-   */
-  @Test
-  public void testAppToFlowRowKey() {
-    byte[] byteRowKey = new AppToFlowRowKey(APPLICATION_ID).getRowKey();
-    AppToFlowRowKey rowKey = AppToFlowRowKey.parseRowKey(byteRowKey);
-    assertEquals(APPLICATION_ID, rowKey.getAppId());
-  }
-
-  @Test
-  public void testEntityRowKey() {
-    TimelineEntity entity = new TimelineEntity();
-    entity.setId("!ent!ity!!id!");
-    entity.setType("entity!Type");
-    entity.setIdPrefix(54321);
-
-    byte[] byteRowKey =
-        new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
-            entity.getType(), entity.getIdPrefix(),
-            entity.getId()).getRowKey();
-    EntityRowKey rowKey = EntityRowKey.parseRowKey(byteRowKey);
-    assertEquals(CLUSTER, rowKey.getClusterId());
-    assertEquals(USER, rowKey.getUserId());
-    assertEquals(FLOW_NAME, rowKey.getFlowName());
-    assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
-    assertEquals(APPLICATION_ID, rowKey.getAppId());
-    assertEquals(entity.getType(), rowKey.getEntityType());
-    assertEquals(entity.getIdPrefix(), rowKey.getEntityIdPrefix().longValue());
-    assertEquals(entity.getId(), rowKey.getEntityId());
-
-    byte[] byteRowKeyPrefix =
-        new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
-            APPLICATION_ID, entity.getType(), null, null)
-                .getRowKeyPrefix();
-    byte[][] splits =
-        Separator.QUALIFIERS.split(
-            byteRowKeyPrefix,
-            new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
-                Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
-                AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE,
-                Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE });
-    assertEquals(7, splits.length);
-    assertEquals(APPLICATION_ID, new AppIdKeyConverter().decode(splits[4]));
-    assertEquals(entity.getType(),
-        Separator.QUALIFIERS.decode(Bytes.toString(splits[5])));
-    verifyRowPrefixBytes(byteRowKeyPrefix);
-
-    byteRowKeyPrefix =
-        new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
-            APPLICATION_ID).getRowKeyPrefix();
-    splits =
-        Separator.QUALIFIERS.split(
-            byteRowKeyPrefix,
-            new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
-                Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
-                AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE});
-    assertEquals(6, splits.length);
-    assertEquals(0, splits[5].length);
-    AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter();
-    assertEquals(APPLICATION_ID, appIdKeyConverter.decode(splits[4]));
-    verifyRowPrefixBytes(byteRowKeyPrefix);
-  }
-
-  @Test
-  public void testFlowActivityRowKey() {
-    Long ts = 1459900830000L;
-    Long dayTimestamp = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts);
-    byte[] byteRowKey =
-        new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME).getRowKey();
-    FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(byteRowKey);
-    assertEquals(CLUSTER, rowKey.getClusterId());
-    assertEquals(dayTimestamp, rowKey.getDayTimestamp());
-    assertEquals(USER, rowKey.getUserId());
-    assertEquals(FLOW_NAME, rowKey.getFlowName());
-
-    byte[] byteRowKeyPrefix =
-        new FlowActivityRowKeyPrefix(CLUSTER).getRowKeyPrefix();
-    byte[][] splits =
-        Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
-            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
-    assertEquals(2, splits.length);
-    assertEquals(0, splits[1].length);
-    assertEquals(CLUSTER,
-        Separator.QUALIFIERS.decode(Bytes.toString(splits[0])));
-    verifyRowPrefixBytes(byteRowKeyPrefix);
-
-    byteRowKeyPrefix =
-        new FlowActivityRowKeyPrefix(CLUSTER, ts).getRowKeyPrefix();
-    splits =
-        Separator.QUALIFIERS.split(byteRowKeyPrefix,
-            new int[] {Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
-                Separator.VARIABLE_SIZE});
-    assertEquals(3, splits.length);
-    assertEquals(0, splits[2].length);
-    assertEquals(CLUSTER,
-        Separator.QUALIFIERS.decode(Bytes.toString(splits[0])));
-    assertEquals(ts,
-        (Long) LongConverter.invertLong(Bytes.toLong(splits[1])));
-    verifyRowPrefixBytes(byteRowKeyPrefix);
-  }
-
-  @Test
-  public void testFlowRunRowKey() {
-    byte[] byteRowKey =
-        new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID).getRowKey();
-    FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(byteRowKey);
-    assertEquals(CLUSTER, rowKey.getClusterId());
-    assertEquals(USER, rowKey.getUserId());
-    assertEquals(FLOW_NAME, rowKey.getFlowName());
-    assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
-
-    byte[] byteRowKeyPrefix =
-        new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, null).getRowKey();
-    byte[][] splits =
-        Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
-            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
-            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
-    assertEquals(4, splits.length);
-    assertEquals(0, splits[3].length);
-    assertEquals(FLOW_NAME,
-        Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
-    verifyRowPrefixBytes(byteRowKeyPrefix);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java
deleted file mode 100644
index 7d37206..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.common;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-
-import com.google.common.collect.Iterables;
-
-public class TestSeparator {
-
-  private static String villain = "Dr. Heinz Doofenshmirtz";
-  private static String special =
-      ".   *   |   ?   +   \t   (   )   [   ]   {   }   ^   $  \\ \"  %";
-
-  /**
-   *
-   */
-  @Test
-  public void testEncodeDecodeString() {
-
-    for (Separator separator : Separator.values()) {
-      testEncodeDecode(separator, "");
-      testEncodeDecode(separator, " ");
-      testEncodeDecode(separator, "!");
-      testEncodeDecode(separator, "?");
-      testEncodeDecode(separator, "&");
-      testEncodeDecode(separator, "+");
-      testEncodeDecode(separator, "\t");
-      testEncodeDecode(separator, "Dr.");
-      testEncodeDecode(separator, "Heinz");
-      testEncodeDecode(separator, "Doofenshmirtz");
-      testEncodeDecode(separator, villain);
-      testEncodeDecode(separator, special);
-
-      assertNull(separator.encode(null));
-
-    }
-  }
-
-  private void testEncodeDecode(Separator separator, String token) {
-    String encoded = separator.encode(token);
-    String decoded = separator.decode(encoded);
-    String msg = "token:" + token + " separator:" + separator + ".";
-    assertEquals(msg, token, decoded);
-  }
-
-  @Test
-  public void testEncodeDecode() {
-    testEncodeDecode("Dr.", Separator.QUALIFIERS);
-    testEncodeDecode("Heinz", Separator.QUALIFIERS, Separator.QUALIFIERS);
-    testEncodeDecode("Doofenshmirtz", Separator.QUALIFIERS, null,
-        Separator.QUALIFIERS);
-    testEncodeDecode("&Perry", Separator.QUALIFIERS, Separator.VALUES, null);
-    testEncodeDecode("the ", Separator.QUALIFIERS, Separator.SPACE);
-    testEncodeDecode("Platypus...", (Separator) null);
-    testEncodeDecode("The what now ?!?", Separator.QUALIFIERS,
-        Separator.VALUES, Separator.SPACE);
-
-  }
-  @Test
-  public void testEncodedValues() {
-    testEncodeDecode("Double-escape %2$ and %9$ or %%2$ or %%3$, nor  %%%2$" +
-        "= no problem!",
-        Separator.QUALIFIERS, Separator.VALUES, Separator.SPACE, 
Separator.TAB);
-  }
-
-  @Test
-  public void testSplits() {
-    byte[] maxLongBytes = Bytes.toBytes(Long.MAX_VALUE);
-    byte[] maxIntBytes = Bytes.toBytes(Integer.MAX_VALUE);
-    for (Separator separator : Separator.values()) {
-      String str1 = "cl" + separator.getValue() + "us";
-      String str2 = separator.getValue() + "rst";
-      byte[] sepByteArr = Bytes.toBytes(separator.getValue());
-      byte[] longVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxLongBytes,
-          sepByteArr.length, Bytes.SIZEOF_LONG - sepByteArr.length));
-      byte[] intVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxIntBytes,
-          sepByteArr.length, Bytes.SIZEOF_INT - sepByteArr.length));
-      byte[] arr = separator.join(
-          Bytes.toBytes(separator.encode(str1)), longVal1Arr,
-          Bytes.toBytes(separator.encode(str2)), intVal1Arr);
-      int[] sizes = {Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
-          Separator.VARIABLE_SIZE, Bytes.SIZEOF_INT};
-      byte[][] splits = separator.split(arr, sizes);
-      assertEquals(4, splits.length);
-      assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
-      assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1]));
-      assertEquals(str2, separator.decode(Bytes.toString(splits[2])));
-      assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3]));
-
-      longVal1Arr = Bytes.add(Bytes.copy(maxLongBytes, 0, Bytes.SIZEOF_LONG -
-          sepByteArr.length), sepByteArr);
-      intVal1Arr = Bytes.add(Bytes.copy(maxIntBytes, 0, Bytes.SIZEOF_INT -
-          sepByteArr.length), sepByteArr);
-      arr = separator.join(Bytes.toBytes(separator.encode(str1)), longVal1Arr,
-          Bytes.toBytes(separator.encode(str2)), intVal1Arr);
-      splits = separator.split(arr, sizes);
-      assertEquals(4, splits.length);
-      assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
-      assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1]));
-      assertEquals(str2, separator.decode(Bytes.toString(splits[2])));
-      assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3]));
-
-      longVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxLongBytes,
-          sepByteArr.length, 4 - sepByteArr.length), sepByteArr);
-      longVal1Arr = Bytes.add(longVal1Arr, Bytes.copy(maxLongBytes, 4, 3 -
-              sepByteArr.length), sepByteArr);
-      arr = separator.join(Bytes.toBytes(separator.encode(str1)), longVal1Arr,
-          Bytes.toBytes(separator.encode(str2)), intVal1Arr);
-      splits = separator.split(arr, sizes);
-      assertEquals(4, splits.length);
-      assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
-      assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1]));
-      assertEquals(str2, separator.decode(Bytes.toString(splits[2])));
-      assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3]));
-
-      arr = separator.join(Bytes.toBytes(separator.encode(str1)),
-          Bytes.toBytes(separator.encode(str2)), intVal1Arr, longVal1Arr);
-      int[] sizes1 = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
-          Bytes.SIZEOF_INT, Bytes.SIZEOF_LONG};
-      splits = separator.split(arr, sizes1);
-      assertEquals(4, splits.length);
-      assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
-      assertEquals(str2, separator.decode(Bytes.toString(splits[1])));
-      assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[2]));
-      assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[3]));
-
-      try {
-        int[] sizes2 = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
-            Bytes.SIZEOF_INT, 7};
-        splits = separator.split(arr, sizes2);
-        fail("Exception should have been thrown.");
-      } catch (IllegalArgumentException e) {}
-
-      try {
-        int[] sizes2 = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, 2,
-            Bytes.SIZEOF_LONG};
-        splits = separator.split(arr, sizes2);
-        fail("Exception should have been thrown.");
-      } catch (IllegalArgumentException e) {}
-    }
-  }
-
-  /**
-   * Simple test to encode and decode using the same separators and confirm 
that
-   * we end up with the same as what we started with.
-   *
-   * @param token
-   * @param separators
-   */
-  private static void testEncodeDecode(String token, Separator... separators) {
-    byte[] encoded = Separator.encode(token, separators);
-    String decoded = Separator.decode(encoded, separators);
-    assertEquals(token, decoded);
-  }
-
-  @Test
-  public void testJoinStripped() {
-    List<String> stringList = new ArrayList<String>(0);
-    stringList.add("nothing");
-
-    String joined = Separator.VALUES.joinEncoded(stringList);
-    Iterable<String> split = Separator.VALUES.splitEncoded(joined);
-    assertTrue(Iterables.elementsEqual(stringList, split));
-
-    stringList = new ArrayList<String>(3);
-    stringList.add("a");
-    stringList.add("b?");
-    stringList.add("c");
-
-    joined = Separator.VALUES.joinEncoded(stringList);
-    split = Separator.VALUES.splitEncoded(joined);
-    assertTrue(Iterables.elementsEqual(stringList, split));
-
-    String[] stringArray1 = {"else"};
-    joined = Separator.VALUES.joinEncoded(stringArray1);
-    split = Separator.VALUES.splitEncoded(joined);
-    assertTrue(Iterables.elementsEqual(Arrays.asList(stringArray1), split));
-
-    String[] stringArray2 = {"d", "e?", "f"};
-    joined = Separator.VALUES.joinEncoded(stringArray2);
-    split = Separator.VALUES.splitEncoded(joined);
-    assertTrue(Iterables.elementsEqual(Arrays.asList(stringArray2), split));
-
-    List<String> empty = new ArrayList<String>(0);
-    split = Separator.VALUES.splitEncoded(null);
-    assertTrue(Iterables.elementsEqual(empty, split));
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
index 1b6b1d5..7dcdc02 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
@@ -44,6 +44,7 @@
     <module>hadoop-yarn-server-applicationhistoryservice</module>
     <module>hadoop-yarn-server-timeline-pluginstorage</module>
     <module>hadoop-yarn-server-timelineservice</module>
+    <module>hadoop-yarn-server-timelineservice-hbase</module>
     <module>hadoop-yarn-server-timelineservice-hbase-tests</module>
   </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
index e53b05d..02b0562 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
@@ -223,7 +223,7 @@ is needed for the `flowrun` table creation in the schema 
creator. The default HD
 For example,
 
     hadoop fs -mkdir /hbase/coprocessor
-    hadoop fs -put hadoop-yarn-server-timelineservice-3.0.0-alpha1-SNAPSHOT.jar
+    hadoop fs -put 
hadoop-yarn-server-timelineservice-hbase-3.0.0-alpha1-SNAPSHOT.jar
            /hbase/coprocessor/hadoop-yarn-server-timelineservice.jar
 
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/pom.xml b/hadoop-yarn-project/pom.xml
index 7ddb31a..b5c2849 100644
--- a/hadoop-yarn-project/pom.xml
+++ b/hadoop-yarn-project/pom.xml
@@ -75,6 +75,10 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-web-proxy</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-timelineservice-hbase</artifactId>
+    </dependency>
   </dependencies>
 
   <build>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to