http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java deleted file mode 100644 index f6904c5..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java +++ /dev/null @@ -1,628 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.reader; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.BinaryComparator; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.FamilyFilter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.PageFilter; -import org.apache.hadoop.hbase.filter.FilterList.Operator; -import org.apache.hadoop.hbase.filter.QualifierFilter; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; - -import com.google.common.base.Preconditions; - -/** - * Timeline entity reader for generic entities that are stored in the entity - * table. - */ -class GenericEntityReader extends TimelineEntityReader { - private static final EntityTable ENTITY_TABLE = new EntityTable(); - - /** - * Used to convert strings key components to and from storage format. - */ - private final KeyConverter<String> stringKeyConverter = - new StringKeyConverter(); - - public GenericEntityReader(TimelineReaderContext ctxt, - TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { - super(ctxt, entityFilters, toRetrieve); - } - - public GenericEntityReader(TimelineReaderContext ctxt, - TimelineDataToRetrieve toRetrieve) { - super(ctxt, toRetrieve); - } - - /** - * Uses the {@link EntityTable}. - */ - protected BaseTable<?> getTable() { - return ENTITY_TABLE; - } - - @Override - protected FilterList constructFilterListBasedOnFilters() throws IOException { - // Filters here cannot be null for multiple entity reads as they are set in - // augmentParams if null. - FilterList listBasedOnFilters = new FilterList(); - TimelineEntityFilters filters = getFilters(); - // Create filter list based on created time range and add it to - // listBasedOnFilters. - long createdTimeBegin = filters.getCreatedTimeBegin(); - long createdTimeEnd = filters.getCreatedTimeEnd(); - if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) { - listBasedOnFilters.addFilter(TimelineFilterUtils - .createSingleColValueFiltersByRange(EntityColumn.CREATED_TIME, - createdTimeBegin, createdTimeEnd)); - } - // Create filter list based on metric filters and add it to - // listBasedOnFilters. - TimelineFilterList metricFilters = filters.getMetricFilters(); - if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.METRIC, metricFilters)); - } - // Create filter list based on config filters and add it to - // listBasedOnFilters. - TimelineFilterList configFilters = filters.getConfigFilters(); - if (configFilters != null && !configFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.CONFIG, configFilters)); - } - // Create filter list based on info filters and add it to listBasedOnFilters - TimelineFilterList infoFilters = filters.getInfoFilters(); - if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.INFO, infoFilters)); - } - return listBasedOnFilters; - } - - /** - * Check if we need to fetch only some of the event columns. - * - * @return true if we need to fetch some of the columns, false otherwise. - */ - private boolean fetchPartialEventCols(TimelineFilterList eventFilters, - EnumSet<Field> fieldsToRetrieve) { - return (eventFilters != null && !eventFilters.getFilterList().isEmpty() && - !hasField(fieldsToRetrieve, Field.EVENTS)); - } - - /** - * Check if we need to fetch only some of the relates_to columns. - * - * @return true if we need to fetch some of the columns, false otherwise. - */ - private boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo, - EnumSet<Field> fieldsToRetrieve) { - return (relatesTo != null && !relatesTo.getFilterList().isEmpty() && - !hasField(fieldsToRetrieve, Field.RELATES_TO)); - } - - /** - * Check if we need to fetch only some of the is_related_to columns. - * - * @return true if we need to fetch some of the columns, false otherwise. - */ - private boolean fetchPartialIsRelatedToCols(TimelineFilterList isRelatedTo, - EnumSet<Field> fieldsToRetrieve) { - return (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty() && - !hasField(fieldsToRetrieve, Field.IS_RELATED_TO)); - } - - /** - * Check if we need to fetch only some of the columns based on event filters, - * relatesto and isrelatedto from info family. - * - * @return true, if we need to fetch only some of the columns, false if we - * need to fetch all the columns under info column family. - */ - protected boolean fetchPartialColsFromInfoFamily() { - EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); - TimelineEntityFilters filters = getFilters(); - return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve) - || fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve) - || fetchPartialIsRelatedToCols(filters.getIsRelatedTo(), - fieldsToRetrieve); - } - - /** - * Check if we need to create filter list based on fields. We need to create a - * filter list iff all fields need not be retrieved or we have some specific - * fields or metrics to retrieve. We also need to create a filter list if we - * have relationships(relatesTo/isRelatedTo) and event filters specified for - * the query. - * - * @return true if we need to create the filter list, false otherwise. - */ - protected boolean needCreateFilterListBasedOnFields() { - TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); - // Check if all fields are to be retrieved or not. If all fields have to - // be retrieved, also check if we have some metrics or configs to - // retrieve specified for the query because then a filter list will have - // to be created. - boolean flag = - !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) - || (dataToRetrieve.getConfsToRetrieve() != null && !dataToRetrieve - .getConfsToRetrieve().getFilterList().isEmpty()) - || (dataToRetrieve.getMetricsToRetrieve() != null && !dataToRetrieve - .getMetricsToRetrieve().getFilterList().isEmpty()); - // Filters need to be checked only if we are reading multiple entities. If - // condition above is false, we check if there are relationships(relatesTo/ - // isRelatedTo) and event filters specified for the query. - if (!flag && !isSingleEntityRead()) { - TimelineEntityFilters filters = getFilters(); - flag = - (filters.getEventFilters() != null && !filters.getEventFilters() - .getFilterList().isEmpty()) - || (filters.getIsRelatedTo() != null && !filters.getIsRelatedTo() - .getFilterList().isEmpty()) - || (filters.getRelatesTo() != null && !filters.getRelatesTo() - .getFilterList().isEmpty()); - } - return flag; - } - - /** - * Add {@link QualifierFilter} filters to filter list for each column of - * entity table. - * - * @param list filter list to which qualifier filters have to be added. - */ - protected void updateFixedColumns(FilterList list) { - for (EntityColumn column : EntityColumn.values()) { - list.addFilter(new QualifierFilter(CompareOp.EQUAL, new BinaryComparator( - column.getColumnQualifierBytes()))); - } - } - - /** - * Creates a filter list which indicates that only some of the column - * qualifiers in the info column family will be returned in result. - * - * @param isApplication If true, it means operations are to be performed for - * application table, otherwise for entity table. - * @return filter list. - * @throws IOException if any problem occurs while creating filter list. - */ - private FilterList createFilterListForColsOfInfoFamily() throws IOException { - FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE); - // Add filters for each column in entity table. - updateFixedColumns(infoFamilyColsFilter); - EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); - // If INFO field has to be retrieved, add a filter for fetching columns - // with INFO column prefix. - if (hasField(fieldsToRetrieve, Field.INFO)) { - infoFamilyColsFilter - .addFilter(TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.EQUAL, EntityColumnPrefix.INFO)); - } - TimelineFilterList relatesTo = getFilters().getRelatesTo(); - if (hasField(fieldsToRetrieve, Field.RELATES_TO)) { - // If RELATES_TO field has to be retrieved, add a filter for fetching - // columns with RELATES_TO column prefix. - infoFamilyColsFilter.addFilter(TimelineFilterUtils - .createHBaseQualifierFilter(CompareOp.EQUAL, - EntityColumnPrefix.RELATES_TO)); - } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) { - // Even if fields to retrieve does not contain RELATES_TO, we still - // need to have a filter to fetch some of the column qualifiers if - // relatesTo filters are specified. relatesTo filters will then be - // matched after fetching rows from HBase. - Set<String> relatesToCols = - TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo); - infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( - EntityColumnPrefix.RELATES_TO, relatesToCols)); - } - TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo(); - if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { - // If IS_RELATED_TO field has to be retrieved, add a filter for fetching - // columns with IS_RELATED_TO column prefix. - infoFamilyColsFilter.addFilter(TimelineFilterUtils - .createHBaseQualifierFilter(CompareOp.EQUAL, - EntityColumnPrefix.IS_RELATED_TO)); - } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) { - // Even if fields to retrieve does not contain IS_RELATED_TO, we still - // need to have a filter to fetch some of the column qualifiers if - // isRelatedTo filters are specified. isRelatedTo filters will then be - // matched after fetching rows from HBase. - Set<String> isRelatedToCols = - TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo); - infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( - EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols)); - } - TimelineFilterList eventFilters = getFilters().getEventFilters(); - if (hasField(fieldsToRetrieve, Field.EVENTS)) { - // If EVENTS field has to be retrieved, add a filter for fetching columns - // with EVENT column prefix. - infoFamilyColsFilter - .addFilter(TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.EQUAL, EntityColumnPrefix.EVENT)); - } else if (eventFilters != null && - !eventFilters.getFilterList().isEmpty()) { - // Even if fields to retrieve does not contain EVENTS, we still need to - // have a filter to fetch some of the column qualifiers on the basis of - // event filters specified. Event filters will then be matched after - // fetching rows from HBase. - Set<String> eventCols = - TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters); - infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( - EntityColumnPrefix.EVENT, eventCols)); - } - return infoFamilyColsFilter; - } - - /** - * Exclude column prefixes via filters which are not required(based on fields - * to retrieve) from info column family. These filters are added to filter - * list which contains a filter for getting info column family. - * - * @param infoColFamilyList filter list for info column family. - */ - private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) { - EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); - // Events not required. - if (!hasField(fieldsToRetrieve, Field.EVENTS)) { - infoColFamilyList.addFilter(TimelineFilterUtils - .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, - EntityColumnPrefix.EVENT)); - } - // info not required. - if (!hasField(fieldsToRetrieve, Field.INFO)) { - infoColFamilyList.addFilter(TimelineFilterUtils - .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, - EntityColumnPrefix.INFO)); - } - // is related to not required. - if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { - infoColFamilyList.addFilter(TimelineFilterUtils - .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, - EntityColumnPrefix.IS_RELATED_TO)); - } - // relates to not required. - if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) { - infoColFamilyList.addFilter(TimelineFilterUtils - .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, - EntityColumnPrefix.RELATES_TO)); - } - } - - /** - * Updates filter list based on fields for confs and metrics to retrieve. - * - * @param listBasedOnFields filter list based on fields. - * @throws IOException if any problem occurs while updating filter list. - */ - private void updateFilterForConfsAndMetricsToRetrieve( - FilterList listBasedOnFields) throws IOException { - TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); - // Please note that if confsToRetrieve is specified, we would have added - // CONFS to fields to retrieve in augmentParams() even if not specified. - if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) { - // Create a filter list for configs. - listBasedOnFields.addFilter(TimelineFilterUtils - .createFilterForConfsOrMetricsToRetrieve( - dataToRetrieve.getConfsToRetrieve(), EntityColumnFamily.CONFIGS, - EntityColumnPrefix.CONFIG)); - } - - // Please note that if metricsToRetrieve is specified, we would have added - // METRICS to fields to retrieve in augmentParams() even if not specified. - if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) { - // Create a filter list for metrics. - listBasedOnFields.addFilter(TimelineFilterUtils - .createFilterForConfsOrMetricsToRetrieve( - dataToRetrieve.getMetricsToRetrieve(), - EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC)); - } - } - - @Override - protected FilterList constructFilterListBasedOnFields() throws IOException { - if (!needCreateFilterListBasedOnFields()) { - // Fetch all the columns. No need of a filter. - return null; - } - FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE); - FilterList infoColFamilyList = new FilterList(); - // By default fetch everything in INFO column family. - FamilyFilter infoColumnFamily = - new FamilyFilter(CompareOp.EQUAL, new BinaryComparator( - EntityColumnFamily.INFO.getBytes())); - infoColFamilyList.addFilter(infoColumnFamily); - if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) { - // We can fetch only some of the columns from info family. - infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily()); - } else { - // Exclude column prefixes in info column family which are not required - // based on fields to retrieve. - excludeFieldsFromInfoColFamily(infoColFamilyList); - } - listBasedOnFields.addFilter(infoColFamilyList); - updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields); - return listBasedOnFields; - } - - @Override - protected void validateParams() { - Preconditions.checkNotNull(getContext(), "context shouldn't be null"); - Preconditions.checkNotNull(getDataToRetrieve(), - "data to retrieve shouldn't be null"); - Preconditions.checkNotNull(getContext().getClusterId(), - "clusterId shouldn't be null"); - Preconditions.checkNotNull(getContext().getAppId(), - "appId shouldn't be null"); - Preconditions.checkNotNull(getContext().getEntityType(), - "entityType shouldn't be null"); - if (isSingleEntityRead()) { - Preconditions.checkNotNull(getContext().getEntityId(), - "entityId shouldn't be null"); - } - } - - @Override - protected void augmentParams(Configuration hbaseConf, Connection conn) - throws IOException { - defaultAugmentParams(hbaseConf, conn); - // Add configs/metrics to fields to retrieve if confsToRetrieve and/or - // metricsToRetrieve are specified. - getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve(); - if (!isSingleEntityRead()) { - createFiltersIfNull(); - } - } - - @Override - protected Result getResult(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException { - TimelineReaderContext context = getContext(); - Result result = null; - if (context.getEntityIdPrefix() != null) { - byte[] rowKey = new EntityRowKey(context.getClusterId(), - context.getUserId(), context.getFlowName(), context.getFlowRunId(), - context.getAppId(), context.getEntityType(), - context.getEntityIdPrefix(), context.getEntityId()).getRowKey(); - - Get get = new Get(rowKey); - get.setMaxVersions(getDataToRetrieve().getMetricsLimit()); - if (filterList != null && !filterList.getFilters().isEmpty()) { - get.setFilter(filterList); - } - result = getTable().getResult(hbaseConf, conn, get); - - } else { - // Prepare for range scan - // create single SingleColumnValueFilter and add to existing filters. - FilterList filter = new FilterList(Operator.MUST_PASS_ALL); - if (filterList != null && !filterList.getFilters().isEmpty()) { - filter.addFilter(filterList); - } - FilterList newFilter = new FilterList(); - newFilter.addFilter(TimelineFilterUtils.createHBaseSingleColValueFilter( - EntityColumn.ID, context.getEntityId(), CompareOp.EQUAL)); - newFilter.addFilter(new PageFilter(1)); - filter.addFilter(newFilter); - - ResultScanner results = getResults(hbaseConf, conn, filter); - try { - Iterator<Result> iterator = results.iterator(); - if (iterator.hasNext()) { - result = iterator.next(); - } - } finally { - results.close(); - } - } - return result; - } - - @Override - protected ResultScanner getResults(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException { - // Scan through part of the table to find the entities belong to one app - // and one type - Scan scan = new Scan(); - TimelineReaderContext context = getContext(); - RowKeyPrefix<EntityRowKey> entityRowKeyPrefix = null; - // default mode, will always scans from beginning of entity type. - if (getFilters() == null || getFilters().getFromIdPrefix() == null) { - entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(), - context.getUserId(), context.getFlowName(), context.getFlowRunId(), - context.getAppId(), context.getEntityType(), null, null); - scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix()); - } else { // pagination mode, will scan from given entityIdPrefix!enitityId - entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(), - context.getUserId(), context.getFlowName(), context.getFlowRunId(), - context.getAppId(), context.getEntityType(), - getFilters().getFromIdPrefix(), getFilters().getFromId()); - - // set start row - scan.setStartRow(entityRowKeyPrefix.getRowKeyPrefix()); - - // get the bytes for stop row - entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(), - context.getUserId(), context.getFlowName(), context.getFlowRunId(), - context.getAppId(), context.getEntityType(), null, null); - - // set stop row - scan.setStopRow( - HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix( - entityRowKeyPrefix.getRowKeyPrefix())); - - // set page filter to limit. This filter has to set only in pagination - // mode. - filterList.addFilter(new PageFilter(getFilters().getLimit())); - } - scan.setMaxVersions(getDataToRetrieve().getMetricsLimit()); - if (filterList != null && !filterList.getFilters().isEmpty()) { - scan.setFilter(filterList); - } - return getTable().getResultScanner(hbaseConf, conn, scan); - } - - @Override - protected TimelineEntity parseEntity(Result result) throws IOException { - if (result == null || result.isEmpty()) { - return null; - } - TimelineEntity entity = new TimelineEntity(); - EntityRowKey parseRowKey = EntityRowKey.parseRowKey(result.getRow()); - entity.setType(parseRowKey.getEntityType()); - entity.setId(parseRowKey.getEntityId()); - entity.setIdPrefix(parseRowKey.getEntityIdPrefix().longValue()); - - TimelineEntityFilters filters = getFilters(); - // fetch created time - Long createdTime = (Long) EntityColumn.CREATED_TIME.readResult(result); - entity.setCreatedTime(createdTime); - - EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); - // fetch is related to entities and match isRelatedTo filter. If isRelatedTo - // filters do not match, entity would be dropped. We have to match filters - // locally as relevant HBase filters to filter out rows on the basis of - // isRelatedTo are not set in HBase scan. - boolean checkIsRelatedTo = - !isSingleEntityRead() && filters.getIsRelatedTo() != null - && filters.getIsRelatedTo().getFilterList().size() > 0; - if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) { - readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true); - if (checkIsRelatedTo - && !TimelineStorageUtils.matchIsRelatedTo(entity, - filters.getIsRelatedTo())) { - return null; - } - if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { - entity.getIsRelatedToEntities().clear(); - } - } - - // fetch relates to entities and match relatesTo filter. If relatesTo - // filters do not match, entity would be dropped. We have to match filters - // locally as relevant HBase filters to filter out rows on the basis of - // relatesTo are not set in HBase scan. - boolean checkRelatesTo = - !isSingleEntityRead() && filters.getRelatesTo() != null - && filters.getRelatesTo().getFilterList().size() > 0; - if (hasField(fieldsToRetrieve, Field.RELATES_TO) - || checkRelatesTo) { - readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false); - if (checkRelatesTo - && !TimelineStorageUtils.matchRelatesTo(entity, - filters.getRelatesTo())) { - return null; - } - if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) { - entity.getRelatesToEntities().clear(); - } - } - - // fetch info if fieldsToRetrieve contains INFO or ALL. - if (hasField(fieldsToRetrieve, Field.INFO)) { - readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false); - } - - // fetch configs if fieldsToRetrieve contains CONFIGS or ALL. - if (hasField(fieldsToRetrieve, Field.CONFIGS)) { - readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true); - } - - // fetch events and match event filters if they exist. If event filters do - // not match, entity would be dropped. We have to match filters locally - // as relevant HBase filters to filter out rows on the basis of events - // are not set in HBase scan. - boolean checkEvents = - !isSingleEntityRead() && filters.getEventFilters() != null - && filters.getEventFilters().getFilterList().size() > 0; - if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) { - readEvents(entity, result, EntityColumnPrefix.EVENT); - if (checkEvents - && !TimelineStorageUtils.matchEventFilters(entity, - filters.getEventFilters())) { - return null; - } - if (!hasField(fieldsToRetrieve, Field.EVENTS)) { - entity.getEvents().clear(); - } - } - - // fetch metrics if fieldsToRetrieve contains METRICS or ALL. - if (hasField(fieldsToRetrieve, Field.METRICS)) { - readMetrics(entity, result, EntityColumnPrefix.METRIC); - } - return entity; - } - - /** - * Helper method for reading key-value pairs for either info or config. - * - * @param <T> Describes the type of column prefix. - * @param entity entity to fill. - * @param result result from HBase. - * @param prefix column prefix. - * @param isConfig if true, means we are reading configs, otherwise info. - * @throws IOException if any problem is encountered while reading result. - */ - protected <T> void readKeyValuePairs(TimelineEntity entity, Result result, - ColumnPrefix<T> prefix, boolean isConfig) throws IOException { - // info and configuration are of type Map<String, Object or String> - Map<String, Object> columns = - prefix.readResults(result, stringKeyConverter); - if (isConfig) { - for (Map.Entry<String, Object> column : columns.entrySet()) { - entity.addConfig(column.getKey(), column.getValue().toString()); - } - } else { - entity.addInfo(columns); - } - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java deleted file mode 100644 index 4c88cd3..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java +++ /dev/null @@ -1,458 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.reader; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Set; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.FilterList.Operator; -import org.apache.hadoop.hbase.filter.QualifierFilter; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; - -/** - * The base class for reading and deserializing timeline entities from the - * HBase storage. Different types can be defined for different types of the - * entities that are being requested. - */ -public abstract class TimelineEntityReader extends - AbstractTimelineStorageReader { - private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class); - - private final boolean singleEntityRead; - private TimelineDataToRetrieve dataToRetrieve; - // used only for multiple entity read mode - private TimelineEntityFilters filters; - - /** - * Main table the entity reader uses. - */ - private BaseTable<?> table; - - /** - * Used to convert strings key components to and from storage format. - */ - private final KeyConverter<String> stringKeyConverter = - new StringKeyConverter(); - - /** - * Instantiates a reader for multiple-entity reads. - * - * @param ctxt Reader context which defines the scope in which query has to be - * made. - * @param entityFilters Filters which limit the entities returned. - * @param toRetrieve Data to retrieve for each entity. - */ - protected TimelineEntityReader(TimelineReaderContext ctxt, - TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { - super(ctxt); - this.singleEntityRead = false; - this.dataToRetrieve = toRetrieve; - this.filters = entityFilters; - - this.setTable(getTable()); - } - - /** - * Instantiates a reader for single-entity reads. - * - * @param ctxt Reader context which defines the scope in which query has to be - * made. - * @param toRetrieve Data to retrieve for each entity. - */ - protected TimelineEntityReader(TimelineReaderContext ctxt, - TimelineDataToRetrieve toRetrieve) { - super(ctxt); - this.singleEntityRead = true; - this.dataToRetrieve = toRetrieve; - - this.setTable(getTable()); - } - - /** - * Creates a {@link FilterList} based on fields, confs and metrics to - * retrieve. This filter list will be set in Scan/Get objects to trim down - * results fetched from HBase back-end storage. This is called only for - * multiple entity reads. - * - * @return a {@link FilterList} object. - * @throws IOException if any problem occurs while creating filter list. - */ - protected abstract FilterList constructFilterListBasedOnFields() - throws IOException; - - /** - * Creates a {@link FilterList} based on info, config and metric filters. This - * filter list will be set in HBase Get to trim down results fetched from - * HBase back-end storage. - * - * @return a {@link FilterList} object. - * @throws IOException if any problem occurs while creating filter list. - */ - protected abstract FilterList constructFilterListBasedOnFilters() - throws IOException; - - /** - * Combines filter lists created based on fields and based on filters. - * - * @return a {@link FilterList} object if it can be constructed. Returns null, - * if filter list cannot be created either on the basis of filters or on the - * basis of fields. - * @throws IOException if any problem occurs while creating filter list. - */ - private FilterList createFilterList() throws IOException { - FilterList listBasedOnFilters = constructFilterListBasedOnFilters(); - boolean hasListBasedOnFilters = listBasedOnFilters != null && - !listBasedOnFilters.getFilters().isEmpty(); - FilterList listBasedOnFields = constructFilterListBasedOnFields(); - boolean hasListBasedOnFields = listBasedOnFields != null && - !listBasedOnFields.getFilters().isEmpty(); - // If filter lists based on both filters and fields can be created, - // combine them in a new filter list and return it. - // If either one of them has been created, return that filter list. - // Return null, if none of the filter lists can be created. This indicates - // that no filter list needs to be added to HBase Scan as filters are not - // specified for the query or only the default view of entity needs to be - // returned. - if (hasListBasedOnFilters && hasListBasedOnFields) { - FilterList list = new FilterList(); - list.addFilter(listBasedOnFilters); - list.addFilter(listBasedOnFields); - return list; - } else if (hasListBasedOnFilters) { - return listBasedOnFilters; - } else if (hasListBasedOnFields) { - return listBasedOnFields; - } - return null; - } - - protected TimelineDataToRetrieve getDataToRetrieve() { - return dataToRetrieve; - } - - protected TimelineEntityFilters getFilters() { - return filters; - } - - /** - * Create a {@link TimelineEntityFilters} object with default values for - * filters. - */ - protected void createFiltersIfNull() { - if (filters == null) { - filters = new TimelineEntityFilters(); - } - } - - /** - * Reads and deserializes a single timeline entity from the HBase storage. - * - * @param hbaseConf HBase Configuration. - * @param conn HBase Connection. - * @return A <cite>TimelineEntity</cite> object. - * @throws IOException if there is any exception encountered while reading - * entity. - */ - public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) - throws IOException { - validateParams(); - augmentParams(hbaseConf, conn); - - FilterList filterList = constructFilterListBasedOnFields(); - if (LOG.isDebugEnabled() && filterList != null) { - LOG.debug("FilterList created for get is - " + filterList); - } - Result result = getResult(hbaseConf, conn, filterList); - if (result == null || result.isEmpty()) { - // Could not find a matching row. - LOG.info("Cannot find matching entity of type " + - getContext().getEntityType()); - return null; - } - return parseEntity(result); - } - - /** - * Reads and deserializes a set of timeline entities from the HBase storage. - * It goes through all the results available, and returns the number of - * entries as specified in the limit in the entity's natural sort order. - * - * @param hbaseConf HBase Configuration. - * @param conn HBase Connection. - * @return a set of <cite>TimelineEntity</cite> objects. - * @throws IOException if any exception is encountered while reading entities. - */ - public Set<TimelineEntity> readEntities(Configuration hbaseConf, - Connection conn) throws IOException { - validateParams(); - augmentParams(hbaseConf, conn); - - Set<TimelineEntity> entities = new LinkedHashSet<>(); - FilterList filterList = createFilterList(); - if (LOG.isDebugEnabled() && filterList != null) { - LOG.debug("FilterList created for scan is - " + filterList); - } - ResultScanner results = getResults(hbaseConf, conn, filterList); - try { - for (Result result : results) { - TimelineEntity entity = parseEntity(result); - if (entity == null) { - continue; - } - entities.add(entity); - if (entities.size() == filters.getLimit()) { - break; - } - } - return entities; - } finally { - results.close(); - } - } - - /** - * Returns the main table to be used by the entity reader. - * - * @return A reference to the table. - */ - protected BaseTable<?> getTable() { - return table; - } - - /** - * Fetches a {@link Result} instance for a single-entity read. - * - * @param hbaseConf HBase Configuration. - * @param conn HBase Connection. - * @param filterList filter list which will be applied to HBase Get. - * @return the {@link Result} instance or null if no such record is found. - * @throws IOException if any exception is encountered while getting result. - */ - protected abstract Result getResult(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException; - - /** - * Fetches a {@link ResultScanner} for a multi-entity read. - * - * @param hbaseConf HBase Configuration. - * @param conn HBase Connection. - * @param filterList filter list which will be applied to HBase Scan. - * @return the {@link ResultScanner} instance. - * @throws IOException if any exception is encountered while getting results. - */ - protected abstract ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException; - - /** - * Parses the result retrieved from HBase backend and convert it into a - * {@link TimelineEntity} object. - * - * @param result Single row result of a Get/Scan. - * @return the <cite>TimelineEntity</cite> instance or null if the entity is - * filtered. - * @throws IOException if any exception is encountered while parsing entity. - */ - protected abstract TimelineEntity parseEntity(Result result) - throws IOException; - - /** - * Helper method for reading and deserializing {@link TimelineMetric} objects - * using the specified column prefix. The timeline metrics then are added to - * the given timeline entity. - * - * @param entity {@link TimelineEntity} object. - * @param result {@link Result} object retrieved from backend. - * @param columnPrefix Metric column prefix - * @throws IOException if any exception is encountered while reading metrics. - */ - protected void readMetrics(TimelineEntity entity, Result result, - ColumnPrefix<?> columnPrefix) throws IOException { - NavigableMap<String, NavigableMap<Long, Number>> metricsResult = - columnPrefix.readResultsWithTimestamps( - result, stringKeyConverter); - for (Map.Entry<String, NavigableMap<Long, Number>> metricResult: - metricsResult.entrySet()) { - TimelineMetric metric = new TimelineMetric(); - metric.setId(metricResult.getKey()); - // Simply assume that if the value set contains more than 1 elements, the - // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric - TimelineMetric.Type metricType = metricResult.getValue().size() > 1 ? - TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE; - metric.setType(metricType); - metric.addValues(metricResult.getValue()); - entity.addMetric(metric); - } - } - - /** - * Checks whether the reader has been created to fetch single entity or - * multiple entities. - * - * @return true, if query is for single entity, false otherwise. - */ - public boolean isSingleEntityRead() { - return singleEntityRead; - } - - protected void setTable(BaseTable<?> baseTable) { - this.table = baseTable; - } - - /** - * Check if we have a certain field amongst fields to retrieve. This method - * checks against {@link Field#ALL} as well because that would mean field - * passed needs to be matched. - * - * @param fieldsToRetrieve fields to be retrieved. - * @param requiredField fields to be checked in fieldsToRetrieve. - * @return true if has the required field, false otherwise. - */ - protected boolean hasField(EnumSet<Field> fieldsToRetrieve, - Field requiredField) { - return fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(requiredField); - } - - /** - * Create a filter list of qualifier filters based on passed set of columns. - * - * @param <T> Describes the type of column prefix. - * @param colPrefix Column Prefix. - * @param columns set of column qualifiers. - * @return filter list. - */ - protected <T> FilterList createFiltersFromColumnQualifiers( - ColumnPrefix<T> colPrefix, Set<String> columns) { - FilterList list = new FilterList(Operator.MUST_PASS_ONE); - for (String column : columns) { - // For columns which have compound column qualifiers (eg. events), we need - // to include the required separator. - byte[] compoundColQual = createColQualifierPrefix(colPrefix, column); - list.addFilter(new QualifierFilter(CompareOp.EQUAL, - new BinaryPrefixComparator(colPrefix - .getColumnPrefixBytes(compoundColQual)))); - } - return list; - } - - protected <T> byte[] createColQualifierPrefix(ColumnPrefix<T> colPrefix, - String column) { - if (colPrefix == ApplicationColumnPrefix.EVENT - || colPrefix == EntityColumnPrefix.EVENT) { - return new EventColumnName(column, null, null).getColumnQualifier(); - } else { - return stringKeyConverter.encode(column); - } - } - - /** - * Helper method for reading relationship. - * - * @param <T> Describes the type of column prefix. - * @param entity entity to fill. - * @param result result from HBase. - * @param prefix column prefix. - * @param isRelatedTo if true, means relationship is to be added to - * isRelatedTo, otherwise its added to relatesTo. - * @throws IOException if any problem is encountered while reading result. - */ - protected <T> void readRelationship(TimelineEntity entity, Result result, - ColumnPrefix<T> prefix, boolean isRelatedTo) throws IOException { - // isRelatedTo and relatesTo are of type Map<String, Set<String>> - Map<String, Object> columns = - prefix.readResults(result, stringKeyConverter); - for (Map.Entry<String, Object> column : columns.entrySet()) { - for (String id : Separator.VALUES.splitEncoded(column.getValue() - .toString())) { - if (isRelatedTo) { - entity.addIsRelatedToEntity(column.getKey(), id); - } else { - entity.addRelatesToEntity(column.getKey(), id); - } - } - } - } - - /** - * Read events from the entity table or the application table. The column name - * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted - * if there is no info associated with the event. - * - * @param <T> Describes the type of column prefix. - * @param entity entity to fill. - * @param result HBase Result. - * @param prefix column prefix. - * @throws IOException if any problem is encountered while reading result. - */ - protected static <T> void readEvents(TimelineEntity entity, Result result, - ColumnPrefix<T> prefix) throws IOException { - Map<String, TimelineEvent> eventsMap = new HashMap<>(); - Map<EventColumnName, Object> eventsResult = - prefix.readResults(result, new EventColumnNameConverter()); - for (Map.Entry<EventColumnName, Object> - eventResult : eventsResult.entrySet()) { - EventColumnName eventColumnName = eventResult.getKey(); - String key = eventColumnName.getId() + - Long.toString(eventColumnName.getTimestamp()); - // Retrieve previously seen event to add to it - TimelineEvent event = eventsMap.get(key); - if (event == null) { - // First time we're seeing this event, add it to the eventsMap - event = new TimelineEvent(); - event.setId(eventColumnName.getId()); - event.setTimestamp(eventColumnName.getTimestamp()); - eventsMap.put(key, event); - } - if (eventColumnName.getInfoKey() != null) { - event.addInfo(eventColumnName.getInfoKey(), eventResult.getValue()); - } - } - Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values()); - entity.addEvents(eventsSet); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java deleted file mode 100644 index 16fffa4..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.reader; - -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; - -/** - * Factory methods for instantiating a timeline entity reader. - */ -public final class TimelineEntityReaderFactory { - private TimelineEntityReaderFactory() { - } - - /** - * Creates a timeline entity reader instance for reading a single entity with - * the specified input. - * - * @param context Reader context which defines the scope in which query has to - * be made. - * @param dataToRetrieve Data to retrieve for each entity. - * @return An implementation of <cite>TimelineEntityReader</cite> object - * depending on entity type. - */ - public static TimelineEntityReader createSingleEntityReader( - TimelineReaderContext context, TimelineDataToRetrieve dataToRetrieve) { - // currently the types that are handled separate from the generic entity - // table are application, flow run, and flow activity entities - if (TimelineEntityType.YARN_APPLICATION.matches(context.getEntityType())) { - return new ApplicationEntityReader(context, dataToRetrieve); - } else if (TimelineEntityType. - YARN_FLOW_RUN.matches(context.getEntityType())) { - return new FlowRunEntityReader(context, dataToRetrieve); - } else if (TimelineEntityType. - YARN_FLOW_ACTIVITY.matches(context.getEntityType())) { - return new FlowActivityEntityReader(context, dataToRetrieve); - } else { - // assume we're dealing with a generic entity read - return new GenericEntityReader(context, dataToRetrieve); - } - } - - /** - * Creates a timeline entity reader instance for reading set of entities with - * the specified input and predicates. - * - * @param context Reader context which defines the scope in which query has to - * be made. - * @param filters Filters which limit the entities returned. - * @param dataToRetrieve Data to retrieve for each entity. - * @return An implementation of <cite>TimelineEntityReader</cite> object - * depending on entity type. - */ - public static TimelineEntityReader createMultipleEntitiesReader( - TimelineReaderContext context, TimelineEntityFilters filters, - TimelineDataToRetrieve dataToRetrieve) { - // currently the types that are handled separate from the generic entity - // table are application, flow run, and flow activity entities - if (TimelineEntityType.YARN_APPLICATION.matches(context.getEntityType())) { - return new ApplicationEntityReader(context, filters, dataToRetrieve); - } else if (TimelineEntityType. - YARN_FLOW_ACTIVITY.matches(context.getEntityType())) { - return new FlowActivityEntityReader(context, filters, dataToRetrieve); - } else if (TimelineEntityType. - YARN_FLOW_RUN.matches(context.getEntityType())) { - return new FlowRunEntityReader(context, filters, dataToRetrieve); - } else { - // assume we're dealing with a generic entity read - return new GenericEntityReader(context, filters, dataToRetrieve); - } - } - - /** - * Creates a timeline entity type reader that will read all available entity - * types within the specified context. - * - * @param context Reader context which defines the scope in which query has to - * be made. Limited to application level only. - * @return an <cite>EntityTypeReader</cite> object - */ - public static EntityTypeReader createEntityTypeReader( - TimelineReaderContext context) { - return new EntityTypeReader(context); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java deleted file mode 100644 index 9814d6d..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Package org.apache.hadoop.yarn.server.timelineservice.storage.reader - * contains classes used to read entities from backend based on query type. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -package org.apache.hadoop.yarn.server.timelineservice.storage.reader; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java deleted file mode 100644 index 58df970..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage.common; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.junit.Test; - -public class TestKeyConverters { - - @Test - public void testAppIdKeyConverter() { - AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter(); - long currentTs = System.currentTimeMillis(); - ApplicationId appId1 = ApplicationId.newInstance(currentTs, 1); - ApplicationId appId2 = ApplicationId.newInstance(currentTs, 2); - ApplicationId appId3 = ApplicationId.newInstance(currentTs + 300, 1); - String appIdStr1 = appId1.toString(); - String appIdStr2 = appId2.toString(); - String appIdStr3 = appId3.toString(); - byte[] appIdBytes1 = appIdKeyConverter.encode(appIdStr1); - byte[] appIdBytes2 = appIdKeyConverter.encode(appIdStr2); - byte[] appIdBytes3 = appIdKeyConverter.encode(appIdStr3); - // App ids' should be encoded in a manner wherein descending order - // is maintained. - assertTrue( - "Ordering of app ids' is incorrect", - Bytes.compareTo(appIdBytes1, appIdBytes2) > 0 - && Bytes.compareTo(appIdBytes1, appIdBytes3) > 0 - && Bytes.compareTo(appIdBytes2, appIdBytes3) > 0); - String decodedAppId1 = appIdKeyConverter.decode(appIdBytes1); - String decodedAppId2 = appIdKeyConverter.decode(appIdBytes2); - String decodedAppId3 = appIdKeyConverter.decode(appIdBytes3); - assertTrue("Decoded app id is not same as the app id encoded", - appIdStr1.equals(decodedAppId1)); - assertTrue("Decoded app id is not same as the app id encoded", - appIdStr2.equals(decodedAppId2)); - assertTrue("Decoded app id is not same as the app id encoded", - appIdStr3.equals(decodedAppId3)); - } - - @Test - public void testEventColumnNameConverter() { - String eventId = "=foo_=eve=nt="; - byte[] valSepBytes = Bytes.toBytes(Separator.VALUES.getValue()); - byte[] maxByteArr = - Bytes.createMaxByteArray(Bytes.SIZEOF_LONG - valSepBytes.length); - byte[] ts = Bytes.add(valSepBytes, maxByteArr); - Long eventTs = Bytes.toLong(ts); - byte[] byteEventColName = - new EventColumnName(eventId, eventTs, null).getColumnQualifier(); - KeyConverter<EventColumnName> eventColumnNameConverter = - new EventColumnNameConverter(); - EventColumnName eventColName = - eventColumnNameConverter.decode(byteEventColName); - assertEquals(eventId, eventColName.getId()); - assertEquals(eventTs, eventColName.getTimestamp()); - assertNull(eventColName.getInfoKey()); - - String infoKey = "f=oo_event_in=fo=_key"; - byteEventColName = - new EventColumnName(eventId, eventTs, infoKey).getColumnQualifier(); - eventColName = eventColumnNameConverter.decode(byteEventColName); - assertEquals(eventId, eventColName.getId()); - assertEquals(eventTs, eventColName.getTimestamp()); - assertEquals(infoKey, eventColName.getInfoKey()); - } - - @Test - public void testLongKeyConverter() { - LongKeyConverter longKeyConverter = new LongKeyConverter(); - confirmLongKeyConverter(longKeyConverter, Long.MIN_VALUE); - confirmLongKeyConverter(longKeyConverter, -1234567890L); - confirmLongKeyConverter(longKeyConverter, -128L); - confirmLongKeyConverter(longKeyConverter, -127L); - confirmLongKeyConverter(longKeyConverter, -1L); - confirmLongKeyConverter(longKeyConverter, 0L); - confirmLongKeyConverter(longKeyConverter, 1L); - confirmLongKeyConverter(longKeyConverter, 127L); - confirmLongKeyConverter(longKeyConverter, 128L); - confirmLongKeyConverter(longKeyConverter, 1234567890L); - confirmLongKeyConverter(longKeyConverter, Long.MAX_VALUE); - } - - private void confirmLongKeyConverter(LongKeyConverter longKeyConverter, - Long testValue) { - Long decoded = longKeyConverter.decode(longKeyConverter.encode(testValue)); - assertEquals(testValue, decoded); - } - - @Test - public void testStringKeyConverter() { - StringKeyConverter stringKeyConverter = new StringKeyConverter(); - String phrase = "QuackAttack now!"; - - for (int i = 0; i < phrase.length(); i++) { - String sub = phrase.substring(i, phrase.length()); - confirmStrignKeyConverter(stringKeyConverter, sub); - confirmStrignKeyConverter(stringKeyConverter, sub + sub); - } - } - - private void confirmStrignKeyConverter(StringKeyConverter stringKeyConverter, - String testValue) { - String decoded = - stringKeyConverter.decode(stringKeyConverter.encode(testValue)); - assertEquals(testValue, decoded); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java deleted file mode 100644 index cbd2273..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java +++ /dev/null @@ -1,250 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.common; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; -import org.junit.Test; - - -public class TestRowKeys { - - private final static String QUALIFIER_SEP = Separator.QUALIFIERS.getValue(); - private final static byte[] QUALIFIER_SEP_BYTES = Bytes - .toBytes(QUALIFIER_SEP); - private final static String CLUSTER = "cl" + QUALIFIER_SEP + "uster"; - private final static String USER = QUALIFIER_SEP + "user"; - private final static String FLOW_NAME = "dummy_" + QUALIFIER_SEP + "flow" - + QUALIFIER_SEP; - private final static Long FLOW_RUN_ID; - private final static String APPLICATION_ID; - static { - long runid = Long.MAX_VALUE - 900L; - byte[] longMaxByteArr = Bytes.toBytes(Long.MAX_VALUE); - byte[] byteArr = Bytes.toBytes(runid); - int sepByteLen = QUALIFIER_SEP_BYTES.length; - if (sepByteLen <= byteArr.length) { - for (int i = 0; i < sepByteLen; i++) { - byteArr[i] = (byte) (longMaxByteArr[i] - QUALIFIER_SEP_BYTES[i]); - } - } - FLOW_RUN_ID = Bytes.toLong(byteArr); - long clusterTs = System.currentTimeMillis(); - byteArr = Bytes.toBytes(clusterTs); - if (sepByteLen <= byteArr.length) { - for (int i = 0; i < sepByteLen; i++) { - byteArr[byteArr.length - sepByteLen + i] = - (byte) (longMaxByteArr[byteArr.length - sepByteLen + i] - - QUALIFIER_SEP_BYTES[i]); - } - } - clusterTs = Bytes.toLong(byteArr); - int seqId = 222; - APPLICATION_ID = ApplicationId.newInstance(clusterTs, seqId).toString(); - } - - private static void verifyRowPrefixBytes(byte[] byteRowKeyPrefix) { - int sepLen = QUALIFIER_SEP_BYTES.length; - for (int i = 0; i < sepLen; i++) { - assertTrue( - "Row key prefix not encoded properly.", - byteRowKeyPrefix[byteRowKeyPrefix.length - sepLen + i] == - QUALIFIER_SEP_BYTES[i]); - } - } - - @Test - public void testApplicationRowKey() { - byte[] byteRowKey = - new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, - APPLICATION_ID).getRowKey(); - ApplicationRowKey rowKey = ApplicationRowKey.parseRowKey(byteRowKey); - assertEquals(CLUSTER, rowKey.getClusterId()); - assertEquals(USER, rowKey.getUserId()); - assertEquals(FLOW_NAME, rowKey.getFlowName()); - assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId()); - assertEquals(APPLICATION_ID, rowKey.getAppId()); - - byte[] byteRowKeyPrefix = - new ApplicationRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID) - .getRowKeyPrefix(); - byte[][] splits = - Separator.QUALIFIERS.split(byteRowKeyPrefix, - new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, - Separator.VARIABLE_SIZE}); - assertEquals(5, splits.length); - assertEquals(0, splits[4].length); - assertEquals(FLOW_NAME, - Separator.QUALIFIERS.decode(Bytes.toString(splits[2]))); - assertEquals(FLOW_RUN_ID, - (Long) LongConverter.invertLong(Bytes.toLong(splits[3]))); - verifyRowPrefixBytes(byteRowKeyPrefix); - - byteRowKeyPrefix = - new ApplicationRowKeyPrefix(CLUSTER, USER, FLOW_NAME).getRowKeyPrefix(); - splits = - Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }); - assertEquals(4, splits.length); - assertEquals(0, splits[3].length); - assertEquals(FLOW_NAME, - Separator.QUALIFIERS.decode(Bytes.toString(splits[2]))); - verifyRowPrefixBytes(byteRowKeyPrefix); - } - - /** - * Tests the converters indirectly through the public methods of the - * corresponding rowkey. - */ - @Test - public void testAppToFlowRowKey() { - byte[] byteRowKey = new AppToFlowRowKey(APPLICATION_ID).getRowKey(); - AppToFlowRowKey rowKey = AppToFlowRowKey.parseRowKey(byteRowKey); - assertEquals(APPLICATION_ID, rowKey.getAppId()); - } - - @Test - public void testEntityRowKey() { - TimelineEntity entity = new TimelineEntity(); - entity.setId("!ent!ity!!id!"); - entity.setType("entity!Type"); - entity.setIdPrefix(54321); - - byte[] byteRowKey = - new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID, - entity.getType(), entity.getIdPrefix(), - entity.getId()).getRowKey(); - EntityRowKey rowKey = EntityRowKey.parseRowKey(byteRowKey); - assertEquals(CLUSTER, rowKey.getClusterId()); - assertEquals(USER, rowKey.getUserId()); - assertEquals(FLOW_NAME, rowKey.getFlowName()); - assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId()); - assertEquals(APPLICATION_ID, rowKey.getAppId()); - assertEquals(entity.getType(), rowKey.getEntityType()); - assertEquals(entity.getIdPrefix(), rowKey.getEntityIdPrefix().longValue()); - assertEquals(entity.getId(), rowKey.getEntityId()); - - byte[] byteRowKeyPrefix = - new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, - APPLICATION_ID, entity.getType(), null, null) - .getRowKeyPrefix(); - byte[][] splits = - Separator.QUALIFIERS.split( - byteRowKeyPrefix, - new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, - AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE, - Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE }); - assertEquals(7, splits.length); - assertEquals(APPLICATION_ID, new AppIdKeyConverter().decode(splits[4])); - assertEquals(entity.getType(), - Separator.QUALIFIERS.decode(Bytes.toString(splits[5]))); - verifyRowPrefixBytes(byteRowKeyPrefix); - - byteRowKeyPrefix = - new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, - APPLICATION_ID).getRowKeyPrefix(); - splits = - Separator.QUALIFIERS.split( - byteRowKeyPrefix, - new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, - AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE}); - assertEquals(6, splits.length); - assertEquals(0, splits[5].length); - AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter(); - assertEquals(APPLICATION_ID, appIdKeyConverter.decode(splits[4])); - verifyRowPrefixBytes(byteRowKeyPrefix); - } - - @Test - public void testFlowActivityRowKey() { - Long ts = 1459900830000L; - Long dayTimestamp = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts); - byte[] byteRowKey = - new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME).getRowKey(); - FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(byteRowKey); - assertEquals(CLUSTER, rowKey.getClusterId()); - assertEquals(dayTimestamp, rowKey.getDayTimestamp()); - assertEquals(USER, rowKey.getUserId()); - assertEquals(FLOW_NAME, rowKey.getFlowName()); - - byte[] byteRowKeyPrefix = - new FlowActivityRowKeyPrefix(CLUSTER).getRowKeyPrefix(); - byte[][] splits = - Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }); - assertEquals(2, splits.length); - assertEquals(0, splits[1].length); - assertEquals(CLUSTER, - Separator.QUALIFIERS.decode(Bytes.toString(splits[0]))); - verifyRowPrefixBytes(byteRowKeyPrefix); - - byteRowKeyPrefix = - new FlowActivityRowKeyPrefix(CLUSTER, ts).getRowKeyPrefix(); - splits = - Separator.QUALIFIERS.split(byteRowKeyPrefix, - new int[] {Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, - Separator.VARIABLE_SIZE}); - assertEquals(3, splits.length); - assertEquals(0, splits[2].length); - assertEquals(CLUSTER, - Separator.QUALIFIERS.decode(Bytes.toString(splits[0]))); - assertEquals(ts, - (Long) LongConverter.invertLong(Bytes.toLong(splits[1]))); - verifyRowPrefixBytes(byteRowKeyPrefix); - } - - @Test - public void testFlowRunRowKey() { - byte[] byteRowKey = - new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID).getRowKey(); - FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(byteRowKey); - assertEquals(CLUSTER, rowKey.getClusterId()); - assertEquals(USER, rowKey.getUserId()); - assertEquals(FLOW_NAME, rowKey.getFlowName()); - assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId()); - - byte[] byteRowKeyPrefix = - new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, null).getRowKey(); - byte[][] splits = - Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }); - assertEquals(4, splits.length); - assertEquals(0, splits[3].length); - assertEquals(FLOW_NAME, - Separator.QUALIFIERS.decode(Bytes.toString(splits[2]))); - verifyRowPrefixBytes(byteRowKeyPrefix); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java deleted file mode 100644 index 7d37206..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java +++ /dev/null @@ -1,215 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.common; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Test; - -import com.google.common.collect.Iterables; - -public class TestSeparator { - - private static String villain = "Dr. Heinz Doofenshmirtz"; - private static String special = - ". * | ? + \t ( ) [ ] { } ^ $ \\ \" %"; - - /** - * - */ - @Test - public void testEncodeDecodeString() { - - for (Separator separator : Separator.values()) { - testEncodeDecode(separator, ""); - testEncodeDecode(separator, " "); - testEncodeDecode(separator, "!"); - testEncodeDecode(separator, "?"); - testEncodeDecode(separator, "&"); - testEncodeDecode(separator, "+"); - testEncodeDecode(separator, "\t"); - testEncodeDecode(separator, "Dr."); - testEncodeDecode(separator, "Heinz"); - testEncodeDecode(separator, "Doofenshmirtz"); - testEncodeDecode(separator, villain); - testEncodeDecode(separator, special); - - assertNull(separator.encode(null)); - - } - } - - private void testEncodeDecode(Separator separator, String token) { - String encoded = separator.encode(token); - String decoded = separator.decode(encoded); - String msg = "token:" + token + " separator:" + separator + "."; - assertEquals(msg, token, decoded); - } - - @Test - public void testEncodeDecode() { - testEncodeDecode("Dr.", Separator.QUALIFIERS); - testEncodeDecode("Heinz", Separator.QUALIFIERS, Separator.QUALIFIERS); - testEncodeDecode("Doofenshmirtz", Separator.QUALIFIERS, null, - Separator.QUALIFIERS); - testEncodeDecode("&Perry", Separator.QUALIFIERS, Separator.VALUES, null); - testEncodeDecode("the ", Separator.QUALIFIERS, Separator.SPACE); - testEncodeDecode("Platypus...", (Separator) null); - testEncodeDecode("The what now ?!?", Separator.QUALIFIERS, - Separator.VALUES, Separator.SPACE); - - } - @Test - public void testEncodedValues() { - testEncodeDecode("Double-escape %2$ and %9$ or %%2$ or %%3$, nor %%%2$" + - "= no problem!", - Separator.QUALIFIERS, Separator.VALUES, Separator.SPACE, Separator.TAB); - } - - @Test - public void testSplits() { - byte[] maxLongBytes = Bytes.toBytes(Long.MAX_VALUE); - byte[] maxIntBytes = Bytes.toBytes(Integer.MAX_VALUE); - for (Separator separator : Separator.values()) { - String str1 = "cl" + separator.getValue() + "us"; - String str2 = separator.getValue() + "rst"; - byte[] sepByteArr = Bytes.toBytes(separator.getValue()); - byte[] longVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxLongBytes, - sepByteArr.length, Bytes.SIZEOF_LONG - sepByteArr.length)); - byte[] intVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxIntBytes, - sepByteArr.length, Bytes.SIZEOF_INT - sepByteArr.length)); - byte[] arr = separator.join( - Bytes.toBytes(separator.encode(str1)), longVal1Arr, - Bytes.toBytes(separator.encode(str2)), intVal1Arr); - int[] sizes = {Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, - Separator.VARIABLE_SIZE, Bytes.SIZEOF_INT}; - byte[][] splits = separator.split(arr, sizes); - assertEquals(4, splits.length); - assertEquals(str1, separator.decode(Bytes.toString(splits[0]))); - assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1])); - assertEquals(str2, separator.decode(Bytes.toString(splits[2]))); - assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3])); - - longVal1Arr = Bytes.add(Bytes.copy(maxLongBytes, 0, Bytes.SIZEOF_LONG - - sepByteArr.length), sepByteArr); - intVal1Arr = Bytes.add(Bytes.copy(maxIntBytes, 0, Bytes.SIZEOF_INT - - sepByteArr.length), sepByteArr); - arr = separator.join(Bytes.toBytes(separator.encode(str1)), longVal1Arr, - Bytes.toBytes(separator.encode(str2)), intVal1Arr); - splits = separator.split(arr, sizes); - assertEquals(4, splits.length); - assertEquals(str1, separator.decode(Bytes.toString(splits[0]))); - assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1])); - assertEquals(str2, separator.decode(Bytes.toString(splits[2]))); - assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3])); - - longVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxLongBytes, - sepByteArr.length, 4 - sepByteArr.length), sepByteArr); - longVal1Arr = Bytes.add(longVal1Arr, Bytes.copy(maxLongBytes, 4, 3 - - sepByteArr.length), sepByteArr); - arr = separator.join(Bytes.toBytes(separator.encode(str1)), longVal1Arr, - Bytes.toBytes(separator.encode(str2)), intVal1Arr); - splits = separator.split(arr, sizes); - assertEquals(4, splits.length); - assertEquals(str1, separator.decode(Bytes.toString(splits[0]))); - assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1])); - assertEquals(str2, separator.decode(Bytes.toString(splits[2]))); - assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3])); - - arr = separator.join(Bytes.toBytes(separator.encode(str1)), - Bytes.toBytes(separator.encode(str2)), intVal1Arr, longVal1Arr); - int[] sizes1 = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, - Bytes.SIZEOF_INT, Bytes.SIZEOF_LONG}; - splits = separator.split(arr, sizes1); - assertEquals(4, splits.length); - assertEquals(str1, separator.decode(Bytes.toString(splits[0]))); - assertEquals(str2, separator.decode(Bytes.toString(splits[1]))); - assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[2])); - assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[3])); - - try { - int[] sizes2 = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, - Bytes.SIZEOF_INT, 7}; - splits = separator.split(arr, sizes2); - fail("Exception should have been thrown."); - } catch (IllegalArgumentException e) {} - - try { - int[] sizes2 = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, 2, - Bytes.SIZEOF_LONG}; - splits = separator.split(arr, sizes2); - fail("Exception should have been thrown."); - } catch (IllegalArgumentException e) {} - } - } - - /** - * Simple test to encode and decode using the same separators and confirm that - * we end up with the same as what we started with. - * - * @param token - * @param separators - */ - private static void testEncodeDecode(String token, Separator... separators) { - byte[] encoded = Separator.encode(token, separators); - String decoded = Separator.decode(encoded, separators); - assertEquals(token, decoded); - } - - @Test - public void testJoinStripped() { - List<String> stringList = new ArrayList<String>(0); - stringList.add("nothing"); - - String joined = Separator.VALUES.joinEncoded(stringList); - Iterable<String> split = Separator.VALUES.splitEncoded(joined); - assertTrue(Iterables.elementsEqual(stringList, split)); - - stringList = new ArrayList<String>(3); - stringList.add("a"); - stringList.add("b?"); - stringList.add("c"); - - joined = Separator.VALUES.joinEncoded(stringList); - split = Separator.VALUES.splitEncoded(joined); - assertTrue(Iterables.elementsEqual(stringList, split)); - - String[] stringArray1 = {"else"}; - joined = Separator.VALUES.joinEncoded(stringArray1); - split = Separator.VALUES.splitEncoded(joined); - assertTrue(Iterables.elementsEqual(Arrays.asList(stringArray1), split)); - - String[] stringArray2 = {"d", "e?", "f"}; - joined = Separator.VALUES.joinEncoded(stringArray2); - split = Separator.VALUES.splitEncoded(joined); - assertTrue(Iterables.elementsEqual(Arrays.asList(stringArray2), split)); - - List<String> empty = new ArrayList<String>(0); - split = Separator.VALUES.splitEncoded(null); - assertTrue(Iterables.elementsEqual(empty, split)); - - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml index 1b6b1d5..7dcdc02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml @@ -44,6 +44,7 @@ <module>hadoop-yarn-server-applicationhistoryservice</module> <module>hadoop-yarn-server-timeline-pluginstorage</module> <module>hadoop-yarn-server-timelineservice</module> + <module>hadoop-yarn-server-timelineservice-hbase</module> <module>hadoop-yarn-server-timelineservice-hbase-tests</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md index e53b05d..02b0562 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md @@ -223,7 +223,7 @@ is needed for the `flowrun` table creation in the schema creator. The default HD For example, hadoop fs -mkdir /hbase/coprocessor - hadoop fs -put hadoop-yarn-server-timelineservice-3.0.0-alpha1-SNAPSHOT.jar + hadoop fs -put hadoop-yarn-server-timelineservice-hbase-3.0.0-alpha1-SNAPSHOT.jar /hbase/coprocessor/hadoop-yarn-server-timelineservice.jar http://git-wip-us.apache.org/repos/asf/hadoop/blob/673ab905/hadoop-yarn-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/pom.xml b/hadoop-yarn-project/pom.xml index 7ddb31a..b5c2849 100644 --- a/hadoop-yarn-project/pom.xml +++ b/hadoop-yarn-project/pom.xml @@ -75,6 +75,10 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-web-proxy</artifactId> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice-hbase</artifactId> + </dependency> </dependencies> <build> --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org