http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java new file mode 100644 index 0000000..faed348 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java @@ -0,0 +1,489 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Query; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnRWHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKeyPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTableRW; +import org.apache.hadoop.yarn.webapp.BadRequestException; + +import com.google.common.base.Preconditions; + +class SubApplicationEntityReader extends GenericEntityReader { + private static final SubApplicationTableRW SUB_APPLICATION_TABLE = + new SubApplicationTableRW(); + + SubApplicationEntityReader(TimelineReaderContext ctxt, + TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { + super(ctxt, entityFilters, toRetrieve); + } + + SubApplicationEntityReader(TimelineReaderContext ctxt, + TimelineDataToRetrieve toRetrieve) { + super(ctxt, toRetrieve); + } + + /** + * Uses the {@link SubApplicationTableRW}. + */ + protected BaseTableRW<?> getTable() { + return SUB_APPLICATION_TABLE; + } + + @Override + protected FilterList constructFilterListBasedOnFilters() throws IOException { + // Filters here cannot be null for multiple entity reads as they are set in + // augmentParams if null. + FilterList listBasedOnFilters = new FilterList(); + TimelineEntityFilters filters = getFilters(); + // Create filter list based on created time range and add it to + // listBasedOnFilters. + long createdTimeBegin = filters.getCreatedTimeBegin(); + long createdTimeEnd = filters.getCreatedTimeEnd(); + if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) { + listBasedOnFilters.addFilter(TimelineFilterUtils + .createSingleColValueFiltersByRange(SubApplicationColumn.CREATED_TIME, + createdTimeBegin, createdTimeEnd)); + } + // Create filter list based on metric filters and add it to + // listBasedOnFilters. + TimelineFilterList metricFilters = filters.getMetricFilters(); + if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) { + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + SubApplicationColumnPrefix.METRIC, metricFilters)); + } + // Create filter list based on config filters and add it to + // listBasedOnFilters. + TimelineFilterList configFilters = filters.getConfigFilters(); + if (configFilters != null && !configFilters.getFilterList().isEmpty()) { + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + SubApplicationColumnPrefix.CONFIG, configFilters)); + } + // Create filter list based on info filters and add it to listBasedOnFilters + TimelineFilterList infoFilters = filters.getInfoFilters(); + if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) { + listBasedOnFilters.addFilter(TimelineFilterUtils + .createHBaseFilterList(SubApplicationColumnPrefix.INFO, infoFilters)); + } + return listBasedOnFilters; + } + + /** + * Add {@link QualifierFilter} filters to filter list for each column of + * entity table. + * + * @param list filter list to which qualifier filters have to be added. + */ + protected void updateFixedColumns(FilterList list) { + for (SubApplicationColumn column : SubApplicationColumn.values()) { + list.addFilter(new QualifierFilter(CompareOp.EQUAL, + new BinaryComparator(column.getColumnQualifierBytes()))); + } + } + + /** + * Creates a filter list which indicates that only some of the column + * qualifiers in the info column family will be returned in result. + * + * @param isApplication If true, it means operations are to be performed for + * application table, otherwise for entity table. + * @return filter list. + * @throws IOException if any problem occurs while creating filter list. + */ + private FilterList createFilterListForColsOfInfoFamily() throws IOException { + FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE); + // Add filters for each column in entity table. + updateFixedColumns(infoFamilyColsFilter); + EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); + // If INFO field has to be retrieved, add a filter for fetching columns + // with INFO column prefix. + if (hasField(fieldsToRetrieve, Field.INFO)) { + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.EQUAL, + SubApplicationColumnPrefix.INFO)); + } + TimelineFilterList relatesTo = getFilters().getRelatesTo(); + if (hasField(fieldsToRetrieve, Field.RELATES_TO)) { + // If RELATES_TO field has to be retrieved, add a filter for fetching + // columns with RELATES_TO column prefix. + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.EQUAL, + SubApplicationColumnPrefix.RELATES_TO)); + } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) { + // Even if fields to retrieve does not contain RELATES_TO, we still + // need to have a filter to fetch some of the column qualifiers if + // relatesTo filters are specified. relatesTo filters will then be + // matched after fetching rows from HBase. + Set<String> relatesToCols = + TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + SubApplicationColumnPrefix.RELATES_TO, relatesToCols)); + } + TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo(); + if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { + // If IS_RELATED_TO field has to be retrieved, add a filter for fetching + // columns with IS_RELATED_TO column prefix. + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.EQUAL, + SubApplicationColumnPrefix.IS_RELATED_TO)); + } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) { + // Even if fields to retrieve does not contain IS_RELATED_TO, we still + // need to have a filter to fetch some of the column qualifiers if + // isRelatedTo filters are specified. isRelatedTo filters will then be + // matched after fetching rows from HBase. + Set<String> isRelatedToCols = + TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + SubApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols)); + } + TimelineFilterList eventFilters = getFilters().getEventFilters(); + if (hasField(fieldsToRetrieve, Field.EVENTS)) { + // If EVENTS field has to be retrieved, add a filter for fetching columns + // with EVENT column prefix. + infoFamilyColsFilter.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.EQUAL, + SubApplicationColumnPrefix.EVENT)); + } else if (eventFilters != null + && !eventFilters.getFilterList().isEmpty()) { + // Even if fields to retrieve does not contain EVENTS, we still need to + // have a filter to fetch some of the column qualifiers on the basis of + // event filters specified. Event filters will then be matched after + // fetching rows from HBase. + Set<String> eventCols = + TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + SubApplicationColumnPrefix.EVENT, eventCols)); + } + return infoFamilyColsFilter; + } + + /** + * Exclude column prefixes via filters which are not required(based on fields + * to retrieve) from info column family. These filters are added to filter + * list which contains a filter for getting info column family. + * + * @param infoColFamilyList filter list for info column family. + */ + private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) { + EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); + // Events not required. + if (!hasField(fieldsToRetrieve, Field.EVENTS)) { + infoColFamilyList.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + SubApplicationColumnPrefix.EVENT)); + } + // info not required. + if (!hasField(fieldsToRetrieve, Field.INFO)) { + infoColFamilyList.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + SubApplicationColumnPrefix.INFO)); + } + // is related to not required. + if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { + infoColFamilyList.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + SubApplicationColumnPrefix.IS_RELATED_TO)); + } + // relates to not required. + if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) { + infoColFamilyList.addFilter( + TimelineFilterUtils.createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + SubApplicationColumnPrefix.RELATES_TO)); + } + } + + /** + * Updates filter list based on fields for confs and metrics to retrieve. + * + * @param listBasedOnFields filter list based on fields. + * @throws IOException if any problem occurs while updating filter list. + */ + private void updateFilterForConfsAndMetricsToRetrieve( + FilterList listBasedOnFields) throws IOException { + TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); + // Please note that if confsToRetrieve is specified, we would have added + // CONFS to fields to retrieve in augmentParams() even if not specified. + if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) { + // Create a filter list for configs. + listBasedOnFields.addFilter( + TimelineFilterUtils.createFilterForConfsOrMetricsToRetrieve( + dataToRetrieve.getConfsToRetrieve(), + SubApplicationColumnFamily.CONFIGS, + SubApplicationColumnPrefix.CONFIG)); + } + + // Please note that if metricsToRetrieve is specified, we would have added + // METRICS to fields to retrieve in augmentParams() even if not specified. + if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) { + // Create a filter list for metrics. + listBasedOnFields.addFilter( + TimelineFilterUtils.createFilterForConfsOrMetricsToRetrieve( + dataToRetrieve.getMetricsToRetrieve(), + SubApplicationColumnFamily.METRICS, + SubApplicationColumnPrefix.METRIC)); + } + } + + @Override + protected FilterList constructFilterListBasedOnFields() throws IOException { + if (!needCreateFilterListBasedOnFields()) { + // Fetch all the columns. No need of a filter. + return null; + } + FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE); + FilterList infoColFamilyList = new FilterList(); + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(SubApplicationColumnFamily.INFO.getBytes())); + infoColFamilyList.addFilter(infoColumnFamily); + if (fetchPartialColsFromInfoFamily()) { + // We can fetch only some of the columns from info family. + infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily()); + } else { + // Exclude column prefixes in info column family which are not required + // based on fields to retrieve. + excludeFieldsFromInfoColFamily(infoColFamilyList); + } + listBasedOnFields.addFilter(infoColFamilyList); + updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields); + return listBasedOnFields; + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(getContext(), "context shouldn't be null"); + Preconditions.checkNotNull(getDataToRetrieve(), + "data to retrieve shouldn't be null"); + Preconditions.checkNotNull(getContext().getClusterId(), + "clusterId shouldn't be null"); + Preconditions.checkNotNull(getContext().getDoAsUser(), + "DoAsUser shouldn't be null"); + Preconditions.checkNotNull(getContext().getEntityType(), + "entityType shouldn't be null"); + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) + throws IOException { + getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve(); + createFiltersIfNull(); + } + + private void setMetricsTimeRange(Query query) { + // Set time range for metric values. + HBaseTimelineStorageUtils.setMetricsTimeRange(query, + SubApplicationColumnFamily.METRICS.getBytes(), + getDataToRetrieve().getMetricsTimeBegin(), + getDataToRetrieve().getMetricsTimeEnd()); + } + + @Override + protected ResultScanner getResults(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { + + // Scan through part of the table to find the entities belong to one app + // and one type + Scan scan = new Scan(); + TimelineReaderContext context = getContext(); + if (context.getDoAsUser() == null) { + throw new BadRequestException("Invalid user!"); + } + + RowKeyPrefix<SubApplicationRowKey> subApplicationRowKeyPrefix = null; + // default mode, will always scans from beginning of entity type. + if (getFilters() == null || getFilters().getFromId() == null) { + subApplicationRowKeyPrefix = new SubApplicationRowKeyPrefix( + context.getDoAsUser(), context.getClusterId(), + context.getEntityType(), null, null, null); + scan.setRowPrefixFilter(subApplicationRowKeyPrefix.getRowKeyPrefix()); + } else { // pagination mode, will scan from given entityIdPrefix!enitityId + + SubApplicationRowKey entityRowKey = null; + try { + entityRowKey = SubApplicationRowKey + .parseRowKeyFromString(getFilters().getFromId()); + } catch (IllegalArgumentException e) { + throw new BadRequestException("Invalid filter fromid is provided."); + } + if (!context.getClusterId().equals(entityRowKey.getClusterId())) { + throw new BadRequestException( + "fromid doesn't belong to clusterId=" + context.getClusterId()); + } + + // set start row + scan.setStartRow(entityRowKey.getRowKey()); + + // get the bytes for stop row + subApplicationRowKeyPrefix = new SubApplicationRowKeyPrefix( + context.getDoAsUser(), context.getClusterId(), + context.getEntityType(), null, null, null); + + // set stop row + scan.setStopRow( + HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix( + subApplicationRowKeyPrefix.getRowKeyPrefix())); + + // set page filter to limit. This filter has to set only in pagination + // mode. + filterList.addFilter(new PageFilter(getFilters().getLimit())); + } + setMetricsTimeRange(scan); + scan.setMaxVersions(getDataToRetrieve().getMetricsLimit()); + if (filterList != null && !filterList.getFilters().isEmpty()) { + scan.setFilter(filterList); + } + return getTable().getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { + throw new UnsupportedOperationException( + "we don't support a single entity query"); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + if (result == null || result.isEmpty()) { + return null; + } + TimelineEntity entity = new TimelineEntity(); + SubApplicationRowKey parseRowKey = + SubApplicationRowKey.parseRowKey(result.getRow()); + entity.setType(parseRowKey.getEntityType()); + entity.setId(parseRowKey.getEntityId()); + entity.setIdPrefix(parseRowKey.getEntityIdPrefix().longValue()); + + TimelineEntityFilters filters = getFilters(); + // fetch created time + Long createdTime = (Long) ColumnRWHelper.readResult(result, + SubApplicationColumn.CREATED_TIME); + entity.setCreatedTime(createdTime); + + EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); + // fetch is related to entities and match isRelatedTo filter. If isRelatedTo + // filters do not match, entity would be dropped. We have to match filters + // locally as relevant HBase filters to filter out rows on the basis of + // isRelatedTo are not set in HBase scan. + boolean checkIsRelatedTo = + filters.getIsRelatedTo() != null + && filters.getIsRelatedTo().getFilterList().size() > 0; + if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) { + readRelationship(entity, result, SubApplicationColumnPrefix.IS_RELATED_TO, + true); + if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity, + filters.getIsRelatedTo())) { + return null; + } + if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { + entity.getIsRelatedToEntities().clear(); + } + } + + // fetch relates to entities and match relatesTo filter. If relatesTo + // filters do not match, entity would be dropped. We have to match filters + // locally as relevant HBase filters to filter out rows on the basis of + // relatesTo are not set in HBase scan. + boolean checkRelatesTo = + !isSingleEntityRead() && filters.getRelatesTo() != null + && filters.getRelatesTo().getFilterList().size() > 0; + if (hasField(fieldsToRetrieve, Field.RELATES_TO) || checkRelatesTo) { + readRelationship(entity, result, SubApplicationColumnPrefix.RELATES_TO, + false); + if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity, + filters.getRelatesTo())) { + return null; + } + if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) { + entity.getRelatesToEntities().clear(); + } + } + + // fetch info if fieldsToRetrieve contains INFO or ALL. + if (hasField(fieldsToRetrieve, Field.INFO)) { + readKeyValuePairs(entity, result, SubApplicationColumnPrefix.INFO, false); + } + + // fetch configs if fieldsToRetrieve contains CONFIGS or ALL. + if (hasField(fieldsToRetrieve, Field.CONFIGS)) { + readKeyValuePairs(entity, result, SubApplicationColumnPrefix.CONFIG, + true); + } + + // fetch events and match event filters if they exist. If event filters do + // not match, entity would be dropped. We have to match filters locally + // as relevant HBase filters to filter out rows on the basis of events + // are not set in HBase scan. + boolean checkEvents = + !isSingleEntityRead() && filters.getEventFilters() != null + && filters.getEventFilters().getFilterList().size() > 0; + if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) { + readEvents(entity, result, SubApplicationColumnPrefix.EVENT); + if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity, + filters.getEventFilters())) { + return null; + } + if (!hasField(fieldsToRetrieve, Field.EVENTS)) { + entity.getEvents().clear(); + } + } + + // fetch metrics if fieldsToRetrieve contains METRICS or ALL. + if (hasField(fieldsToRetrieve, Field.METRICS)) { + readMetrics(entity, result, SubApplicationColumnPrefix.METRIC); + } + + entity.getInfo().put(TimelineReaderUtils.FROMID_KEY, + parseRowKey.getRowKeyAsString()); + return entity; + } + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java new file mode 100644 index 0000000..3168163 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -0,0 +1,464 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnRWHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The base class for reading and deserializing timeline entities from the + * HBase storage. Different types can be defined for different types of the + * entities that are being requested. + */ +public abstract class TimelineEntityReader extends + AbstractTimelineStorageReader { + private static final Logger LOG = + LoggerFactory.getLogger(TimelineEntityReader.class); + + private final boolean singleEntityRead; + private TimelineDataToRetrieve dataToRetrieve; + // used only for multiple entity read mode + private TimelineEntityFilters filters; + + /** + * Main table the entity reader uses. + */ + private BaseTableRW<?> table; + + /** + * Used to convert strings key components to and from storage format. + */ + private final KeyConverter<String> stringKeyConverter = + new StringKeyConverter(); + + /** + * Instantiates a reader for multiple-entity reads. + * + * @param ctxt Reader context which defines the scope in which query has to be + * made. + * @param entityFilters Filters which limit the entities returned. + * @param toRetrieve Data to retrieve for each entity. + */ + protected TimelineEntityReader(TimelineReaderContext ctxt, + TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { + super(ctxt); + this.singleEntityRead = false; + this.dataToRetrieve = toRetrieve; + this.filters = entityFilters; + + this.setTable(getTable()); + } + + /** + * Instantiates a reader for single-entity reads. + * + * @param ctxt Reader context which defines the scope in which query has to be + * made. + * @param toRetrieve Data to retrieve for each entity. + */ + protected TimelineEntityReader(TimelineReaderContext ctxt, + TimelineDataToRetrieve toRetrieve) { + super(ctxt); + this.singleEntityRead = true; + this.dataToRetrieve = toRetrieve; + + this.setTable(getTable()); + } + + /** + * Creates a {@link FilterList} based on fields, confs and metrics to + * retrieve. This filter list will be set in Scan/Get objects to trim down + * results fetched from HBase back-end storage. This is called only for + * multiple entity reads. + * + * @return a {@link FilterList} object. + * @throws IOException if any problem occurs while creating filter list. + */ + protected abstract FilterList constructFilterListBasedOnFields() + throws IOException; + + /** + * Creates a {@link FilterList} based on info, config and metric filters. This + * filter list will be set in HBase Get to trim down results fetched from + * HBase back-end storage. + * + * @return a {@link FilterList} object. + * @throws IOException if any problem occurs while creating filter list. + */ + protected abstract FilterList constructFilterListBasedOnFilters() + throws IOException; + + /** + * Combines filter lists created based on fields and based on filters. + * + * @return a {@link FilterList} object if it can be constructed. Returns null, + * if filter list cannot be created either on the basis of filters or on the + * basis of fields. + * @throws IOException if any problem occurs while creating filter list. + */ + private FilterList createFilterList() throws IOException { + FilterList listBasedOnFilters = constructFilterListBasedOnFilters(); + boolean hasListBasedOnFilters = listBasedOnFilters != null && + !listBasedOnFilters.getFilters().isEmpty(); + FilterList listBasedOnFields = constructFilterListBasedOnFields(); + boolean hasListBasedOnFields = listBasedOnFields != null && + !listBasedOnFields.getFilters().isEmpty(); + // If filter lists based on both filters and fields can be created, + // combine them in a new filter list and return it. + // If either one of them has been created, return that filter list. + // Return null, if none of the filter lists can be created. This indicates + // that no filter list needs to be added to HBase Scan as filters are not + // specified for the query or only the default view of entity needs to be + // returned. + if (hasListBasedOnFilters && hasListBasedOnFields) { + FilterList list = new FilterList(); + list.addFilter(listBasedOnFilters); + list.addFilter(listBasedOnFields); + return list; + } else if (hasListBasedOnFilters) { + return listBasedOnFilters; + } else if (hasListBasedOnFields) { + return listBasedOnFields; + } + return null; + } + + protected TimelineDataToRetrieve getDataToRetrieve() { + return dataToRetrieve; + } + + protected TimelineEntityFilters getFilters() { + return filters; + } + + /** + * Create a {@link TimelineEntityFilters} object with default values for + * filters. + */ + protected void createFiltersIfNull() { + if (filters == null) { + filters = new TimelineEntityFilters.Builder().build(); + } + } + + /** + * Reads and deserializes a single timeline entity from the HBase storage. + * + * @param hbaseConf HBase Configuration. + * @param conn HBase Connection. + * @return A <cite>TimelineEntity</cite> object. + * @throws IOException if there is any exception encountered while reading + * entity. + */ + public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) + throws IOException { + validateParams(); + augmentParams(hbaseConf, conn); + + FilterList filterList = constructFilterListBasedOnFields(); + if (LOG.isDebugEnabled() && filterList != null) { + LOG.debug("FilterList created for get is - " + filterList); + } + Result result = getResult(hbaseConf, conn, filterList); + if (result == null || result.isEmpty()) { + // Could not find a matching row. + LOG.info("Cannot find matching entity of type " + + getContext().getEntityType()); + return null; + } + return parseEntity(result); + } + + /** + * Reads and deserializes a set of timeline entities from the HBase storage. + * It goes through all the results available, and returns the number of + * entries as specified in the limit in the entity's natural sort order. + * + * @param hbaseConf HBase Configuration. + * @param conn HBase Connection. + * @return a set of <cite>TimelineEntity</cite> objects. + * @throws IOException if any exception is encountered while reading entities. + */ + public Set<TimelineEntity> readEntities(Configuration hbaseConf, + Connection conn) throws IOException { + validateParams(); + augmentParams(hbaseConf, conn); + + Set<TimelineEntity> entities = new LinkedHashSet<>(); + FilterList filterList = createFilterList(); + if (LOG.isDebugEnabled() && filterList != null) { + LOG.debug("FilterList created for scan is - " + filterList); + } + ResultScanner results = getResults(hbaseConf, conn, filterList); + try { + for (Result result : results) { + TimelineEntity entity = parseEntity(result); + if (entity == null) { + continue; + } + entities.add(entity); + if (entities.size() == filters.getLimit()) { + break; + } + } + return entities; + } finally { + results.close(); + } + } + + /** + * Returns the main table to be used by the entity reader. + * + * @return A reference to the table. + */ + protected BaseTableRW<?> getTable() { + return table; + } + + /** + * Fetches a {@link Result} instance for a single-entity read. + * + * @param hbaseConf HBase Configuration. + * @param conn HBase Connection. + * @param filterList filter list which will be applied to HBase Get. + * @return the {@link Result} instance or null if no such record is found. + * @throws IOException if any exception is encountered while getting result. + */ + protected abstract Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException; + + /** + * Fetches a {@link ResultScanner} for a multi-entity read. + * + * @param hbaseConf HBase Configuration. + * @param conn HBase Connection. + * @param filterList filter list which will be applied to HBase Scan. + * @return the {@link ResultScanner} instance. + * @throws IOException if any exception is encountered while getting results. + */ + protected abstract ResultScanner getResults(Configuration hbaseConf, + Connection conn, FilterList filterList) throws IOException; + + /** + * Parses the result retrieved from HBase backend and convert it into a + * {@link TimelineEntity} object. + * + * @param result Single row result of a Get/Scan. + * @return the <cite>TimelineEntity</cite> instance or null if the entity is + * filtered. + * @throws IOException if any exception is encountered while parsing entity. + */ + protected abstract TimelineEntity parseEntity(Result result) + throws IOException; + + /** + * Helper method for reading and deserializing {@link TimelineMetric} objects + * using the specified column prefix. The timeline metrics then are added to + * the given timeline entity. + * + * @param entity {@link TimelineEntity} object. + * @param result {@link Result} object retrieved from backend. + * @param columnPrefix Metric column prefix + * @throws IOException if any exception is encountered while reading metrics. + */ + protected void readMetrics(TimelineEntity entity, Result result, + ColumnPrefix<?> columnPrefix) throws IOException { + NavigableMap<String, NavigableMap<Long, Number>> metricsResult = + ColumnRWHelper.readResultsWithTimestamps( + result, columnPrefix, stringKeyConverter); + for (Map.Entry<String, NavigableMap<Long, Number>> metricResult: + metricsResult.entrySet()) { + TimelineMetric metric = new TimelineMetric(); + metric.setId(metricResult.getKey()); + // Simply assume that if the value set contains more than 1 elements, the + // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric + TimelineMetric.Type metricType = metricResult.getValue().size() > 1 ? + TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE; + metric.setType(metricType); + metric.addValues(metricResult.getValue()); + entity.addMetric(metric); + } + } + + /** + * Checks whether the reader has been created to fetch single entity or + * multiple entities. + * + * @return true, if query is for single entity, false otherwise. + */ + public boolean isSingleEntityRead() { + return singleEntityRead; + } + + protected void setTable(BaseTableRW<?> baseTable) { + this.table = baseTable; + } + + /** + * Check if we have a certain field amongst fields to retrieve. This method + * checks against {@link Field#ALL} as well because that would mean field + * passed needs to be matched. + * + * @param fieldsToRetrieve fields to be retrieved. + * @param requiredField fields to be checked in fieldsToRetrieve. + * @return true if has the required field, false otherwise. + */ + protected boolean hasField(EnumSet<Field> fieldsToRetrieve, + Field requiredField) { + return fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(requiredField); + } + + /** + * Create a filter list of qualifier filters based on passed set of columns. + * + * @param <T> Describes the type of column prefix. + * @param colPrefix Column Prefix. + * @param columns set of column qualifiers. + * @return filter list. + */ + protected <T extends BaseTable<T>> FilterList + createFiltersFromColumnQualifiers( + ColumnPrefix<T> colPrefix, Set<String> columns) { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + for (String column : columns) { + // For columns which have compound column qualifiers (eg. events), we need + // to include the required separator. + byte[] compoundColQual = createColQualifierPrefix(colPrefix, column); + list.addFilter(new QualifierFilter(CompareOp.EQUAL, + new BinaryPrefixComparator(colPrefix + .getColumnPrefixBytes(compoundColQual)))); + } + return list; + } + + protected <T extends BaseTable<T>> byte[] createColQualifierPrefix( + ColumnPrefix<T> colPrefix, String column) { + if (colPrefix == ApplicationColumnPrefix.EVENT + || colPrefix == EntityColumnPrefix.EVENT) { + return new EventColumnName(column, null, null).getColumnQualifier(); + } else { + return stringKeyConverter.encode(column); + } + } + + /** + * Helper method for reading relationship. + * + * @param <T> Describes the type of column prefix. + * @param entity entity to fill. + * @param result result from HBase. + * @param prefix column prefix. + * @param isRelatedTo if true, means relationship is to be added to + * isRelatedTo, otherwise its added to relatesTo. + * @throws IOException if any problem is encountered while reading result. + */ + protected <T extends BaseTable<T>> void readRelationship( + TimelineEntity entity, Result result, + ColumnPrefix<T> prefix, boolean isRelatedTo) throws IOException { + // isRelatedTo and relatesTo are of type Map<String, Set<String>> + Map<String, Object> columns = ColumnRWHelper.readResults( + result, prefix, stringKeyConverter); + for (Map.Entry<String, Object> column : columns.entrySet()) { + for (String id : Separator.VALUES.splitEncoded(column.getValue() + .toString())) { + if (isRelatedTo) { + entity.addIsRelatedToEntity(column.getKey(), id); + } else { + entity.addRelatesToEntity(column.getKey(), id); + } + } + } + } + + /** + * Read events from the entity table or the application table. The column name + * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted + * if there is no info associated with the event. + * + * @param <T> Describes the type of column prefix. + * @param entity entity to fill. + * @param result HBase Result. + * @param prefix column prefix. + * @throws IOException if any problem is encountered while reading result. + */ + protected static <T extends BaseTable<T>> void readEvents( + TimelineEntity entity, Result result, + ColumnPrefix<T> prefix) throws IOException { + Map<String, TimelineEvent> eventsMap = new HashMap<>(); + Map<EventColumnName, Object> eventsResult = ColumnRWHelper.readResults( + result, prefix, new EventColumnNameConverter()); + for (Map.Entry<EventColumnName, Object> + eventResult : eventsResult.entrySet()) { + EventColumnName eventColumnName = eventResult.getKey(); + String key = eventColumnName.getId() + + Long.toString(eventColumnName.getTimestamp()); + // Retrieve previously seen event to add to it + TimelineEvent event = eventsMap.get(key); + if (event == null) { + // First time we're seeing this event, add it to the eventsMap + event = new TimelineEvent(); + event.setId(eventColumnName.getId()); + event.setTimestamp(eventColumnName.getTimestamp()); + eventsMap.put(key, event); + } + if (eventColumnName.getInfoKey() != null) { + event.addInfo(eventColumnName.getInfoKey(), eventResult.getValue()); + } + } + Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values()); + entity.addEvents(eventsSet); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java new file mode 100644 index 0000000..fa16077 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; + +/** + * Factory methods for instantiating a timeline entity reader. + */ +public final class TimelineEntityReaderFactory { + private TimelineEntityReaderFactory() { + } + + /** + * Creates a timeline entity reader instance for reading a single entity with + * the specified input. + * + * @param context Reader context which defines the scope in which query has to + * be made. + * @param dataToRetrieve Data to retrieve for each entity. + * @return An implementation of <cite>TimelineEntityReader</cite> object + * depending on entity type. + */ + public static TimelineEntityReader createSingleEntityReader( + TimelineReaderContext context, TimelineDataToRetrieve dataToRetrieve) { + // currently the types that are handled separate from the generic entity + // table are application, flow run, and flow activity entities + if (TimelineEntityType.YARN_APPLICATION.matches(context.getEntityType())) { + return new ApplicationEntityReader(context, dataToRetrieve); + } else if (TimelineEntityType. + YARN_FLOW_RUN.matches(context.getEntityType())) { + return new FlowRunEntityReader(context, dataToRetrieve); + } else if (TimelineEntityType. + YARN_FLOW_ACTIVITY.matches(context.getEntityType())) { + return new FlowActivityEntityReader(context, dataToRetrieve); + } else { + // assume we're dealing with a generic entity read + return new GenericEntityReader(context, dataToRetrieve); + } + } + + /** + * Creates a timeline entity reader instance for reading set of entities with + * the specified input and predicates. + * + * @param context Reader context which defines the scope in which query has to + * be made. + * @param filters Filters which limit the entities returned. + * @param dataToRetrieve Data to retrieve for each entity. + * @return An implementation of <cite>TimelineEntityReader</cite> object + * depending on entity type. + */ + public static TimelineEntityReader createMultipleEntitiesReader( + TimelineReaderContext context, TimelineEntityFilters filters, + TimelineDataToRetrieve dataToRetrieve) { + // currently the types that are handled separate from the generic entity + // table are application, flow run, and flow activity entities + if (TimelineEntityType.YARN_APPLICATION.matches(context.getEntityType())) { + return new ApplicationEntityReader(context, filters, dataToRetrieve); + } else if (TimelineEntityType. + YARN_FLOW_ACTIVITY.matches(context.getEntityType())) { + return new FlowActivityEntityReader(context, filters, dataToRetrieve); + } else if (TimelineEntityType. + YARN_FLOW_RUN.matches(context.getEntityType())) { + return new FlowRunEntityReader(context, filters, dataToRetrieve); + } else { + if (context.getDoAsUser() != null) { + return new SubApplicationEntityReader(context, filters, dataToRetrieve); + } + // assume we're dealing with a generic entity read + return new GenericEntityReader(context, filters, dataToRetrieve); + } + } + + /** + * Creates a timeline entity type reader that will read all available entity + * types within the specified context. + * + * @param context Reader context which defines the scope in which query has to + * be made. Limited to application level only. + * @return an <cite>EntityTypeReader</cite> object + */ + public static EntityTypeReader createEntityTypeReader( + TimelineReaderContext context) { + return new EntityTypeReader(context); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java new file mode 100644 index 0000000..9814d6d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage.reader + * contains classes used to read entities from backend based on query type. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationTableRW.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationTableRW.java new file mode 100644 index 0000000..256b24b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/SubApplicationTableRW.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.subapplication; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Create, read and write to the SubApplication table. + */ +public class SubApplicationTableRW extends BaseTableRW<SubApplicationTable> { + /** sub app prefix. */ + private static final String PREFIX = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + "subapplication"; + + /** config param name that specifies the subapplication table name. */ + public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; + + /** + * config param name that specifies the TTL for metrics column family in + * subapplication table. + */ + private static final String METRICS_TTL_CONF_NAME = PREFIX + + ".table.metrics.ttl"; + + /** + * config param name that specifies max-versions for + * metrics column family in subapplication table. + */ + private static final String METRICS_MAX_VERSIONS = + PREFIX + ".table.metrics.max-versions"; + + /** default value for subapplication table name. */ + public static final String DEFAULT_TABLE_NAME = + "timelineservice.subapplication"; + + /** default TTL is 30 days for metrics timeseries. */ + private static final int DEFAULT_METRICS_TTL = 2592000; + + /** default max number of versions. */ + private static final int DEFAULT_METRICS_MAX_VERSIONS = 10000; + + private static final Logger LOG = LoggerFactory.getLogger( + SubApplicationTableRW.class); + + public SubApplicationTableRW() { + super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTableRW# + * createTable(org.apache.hadoop.hbase.client.Admin, + * org.apache.hadoop.conf.Configuration) + */ + public void createTable(Admin admin, Configuration hbaseConf) + throws IOException { + + TableName table = getTableName(hbaseConf); + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + throw new IOException("Table " + table.getNameAsString() + + " already exists."); + } + + HTableDescriptor subAppTableDescp = new HTableDescriptor(table); + HColumnDescriptor infoCF = + new HColumnDescriptor(SubApplicationColumnFamily.INFO.getBytes()); + infoCF.setBloomFilterType(BloomType.ROWCOL); + subAppTableDescp.addFamily(infoCF); + + HColumnDescriptor configCF = + new HColumnDescriptor(SubApplicationColumnFamily.CONFIGS.getBytes()); + configCF.setBloomFilterType(BloomType.ROWCOL); + configCF.setBlockCacheEnabled(true); + subAppTableDescp.addFamily(configCF); + + HColumnDescriptor metricsCF = + new HColumnDescriptor(SubApplicationColumnFamily.METRICS.getBytes()); + subAppTableDescp.addFamily(metricsCF); + metricsCF.setBlockCacheEnabled(true); + // always keep 1 version (the latest) + metricsCF.setMinVersions(1); + metricsCF.setMaxVersions( + hbaseConf.getInt(METRICS_MAX_VERSIONS, DEFAULT_METRICS_MAX_VERSIONS)); + metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME, + DEFAULT_METRICS_TTL)); + subAppTableDescp.setRegionSplitPolicyClassName( + "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy"); + subAppTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length", + TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH); + admin.createTable(subAppTableDescp, + TimelineHBaseSchemaConstants.getUsernameSplits()); + LOG.info("Status of table creation for " + table.getNameAsString() + "=" + + admin.tableExists(table)); + } + + /** + * @param metricsTTL time to live parameter for the metricss in this table. + * @param hbaseConf configururation in which to set the metrics TTL config + * variable. + */ + public void setMetricsTTL(int metricsTTL, Configuration hbaseConf) { + hbaseConf.setInt(METRICS_TTL_CONF_NAME, metricsTTL); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/package-info.java new file mode 100644 index 0000000..52cc399 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/subapplication/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage.subapplication + * contains classes related to implementation for subapplication table. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.subapplication; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestHBaseTimelineStorageUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestHBaseTimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestHBaseTimelineStorageUtils.java new file mode 100644 index 0000000..402a89b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestHBaseTimelineStorageUtils.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.junit.Test; + +/** + * Unit tests for HBaseTimelineStorageUtils static methos. + */ +public class TestHBaseTimelineStorageUtils { + + @Test(expected=NullPointerException.class) + public void testGetTimelineServiceHBaseConfNullArgument() throws Exception { + HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(null); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/pom.xml new file mode 100644 index 0000000..cb0d6e2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/pom.xml @@ -0,0 +1,132 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>hadoop-yarn-server-timelineservice-hbase</artifactId> + <groupId>org.apache.hadoop</groupId> + <version>3.2.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>hadoop-yarn-server-timelineservice-hbase-common</artifactId> + <name>Apache Hadoop YARN TimelineService HBase Common</name> + <version>3.2.0-SNAPSHOT</version> + + <properties> + <!-- Needed for generating FindBugs warnings using parent pom --> + <yarn.basedir>${project.parent.parent.parent.basedir}</yarn.basedir> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice</artifactId> + </dependency> + + <!-- This is needed for GenericObjectMapper in GenericConverter --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId> + <scope>provided</scope> + </dependency> + + <!-- 'mvn dependency:analyze' fails to detect use of this direct + dependency --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + <phase>test-compile</phase> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <includeScope>runtime</includeScope> + <excludeGroupIds>org.slf4j,org.apache.hadoop,com.github.stephenc.findbugs</excludeGroupIds> + <outputDirectory>${project.build.directory}/lib</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java new file mode 100644 index 0000000..c3d6a52 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; + +/** + * Identifies fully qualified columns for the {@link ApplicationTable}. + */ +public enum ApplicationColumn implements Column<ApplicationTable> { + + /** + * App id. + */ + ID(ApplicationColumnFamily.INFO, "id"), + + /** + * When the application was created. + */ + CREATED_TIME(ApplicationColumnFamily.INFO, "created_time", + new LongConverter()), + + /** + * The version of the flow that this app belongs to. + */ + FLOW_VERSION(ApplicationColumnFamily.INFO, "flow_version"); + + private final ColumnFamily<ApplicationTable> columnFamily; + private final String columnQualifier; + private final byte[] columnQualifierBytes; + private final ValueConverter valueConverter; + + private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily, + String columnQualifier) { + this(columnFamily, columnQualifier, GenericConverter.getInstance()); + } + + private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily, + String columnQualifier, ValueConverter converter) { + this.columnFamily = columnFamily; + this.columnQualifier = columnQualifier; + // Future-proof by ensuring the right column prefix hygiene. + this.columnQualifierBytes = + Bytes.toBytes(Separator.SPACE.encode(columnQualifier)); + this.valueConverter = converter; + } + + /** + * @return the column name value + */ + private String getColumnQualifier() { + return columnQualifier; + } + + @Override + public byte[] getColumnQualifierBytes() { + return columnQualifierBytes.clone(); + } + + @Override + public byte[] getColumnFamilyBytes() { + return columnFamily.getBytes(); + } + + @Override + public ValueConverter getValueConverter() { + return valueConverter; + } + + @Override + public Attribute[] getCombinedAttrsWithAggr(Attribute... attributes) { + return attributes; + } + + @Override + public boolean supplementCellTimestamp() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java new file mode 100644 index 0000000..97e5f7b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + +/** + * Represents the application table column families. + */ +public enum ApplicationColumnFamily implements ColumnFamily<ApplicationTable> { + + /** + * Info column family houses known columns, specifically ones included in + * columnfamily filters. + */ + INFO("i"), + + /** + * Configurations are in a separate column family for two reasons: a) the size + * of the config values can be very large and b) we expect that config values + * are often separately accessed from other metrics and info columns. + */ + CONFIGS("c"), + + /** + * Metrics have a separate column family, because they have a separate TTL. + */ + METRICS("m"); + + /** + * Byte representation of this column family. + */ + private final byte[] bytes; + + /** + * @param value create a column family with this name. Must be lower case and + * without spaces. + */ + private ApplicationColumnFamily(String value) { + // column families should be lower case and not contain any spaces. + this.bytes = Bytes.toBytes(Separator.SPACE.encode(value)); + } + + public byte[] getBytes() { + return Bytes.copy(bytes); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java new file mode 100644 index 0000000..89412f4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-common/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; + +/** + * Identifies partially qualified columns for the application table. + */ +public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> { + + /** + * To store TimelineEntity getIsRelatedToEntities values. + */ + IS_RELATED_TO(ApplicationColumnFamily.INFO, "s"), + + /** + * To store TimelineEntity getRelatesToEntities values. + */ + RELATES_TO(ApplicationColumnFamily.INFO, "r"), + + /** + * To store TimelineEntity info values. + */ + INFO(ApplicationColumnFamily.INFO, "i"), + + /** + * Lifecycle events for an application. + */ + EVENT(ApplicationColumnFamily.INFO, "e"), + + /** + * Config column stores configuration with config key as the column name. + */ + CONFIG(ApplicationColumnFamily.CONFIGS, null), + + /** + * Metrics are stored with the metric name as the column name. + */ + METRIC(ApplicationColumnFamily.METRICS, null, new LongConverter()); + + private final ColumnFamily<ApplicationTable> columnFamily; + + /** + * Can be null for those cases where the provided column qualifier is the + * entire column name. + */ + private final String columnPrefix; + private final byte[] columnPrefixBytes; + private final ValueConverter valueConverter; + + /** + * Private constructor, meant to be used by the enum definition. + * + * @param columnFamily that this column is stored in. + * @param columnPrefix for this column. + */ + private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily, + String columnPrefix) { + this(columnFamily, columnPrefix, GenericConverter.getInstance()); + } + + /** + * Private constructor, meant to be used by the enum definition. + * + * @param columnFamily that this column is stored in. + * @param columnPrefix for this column. + * @param converter used to encode/decode values to be stored in HBase for + * this column prefix. + */ + private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily, + String columnPrefix, ValueConverter converter) { + this.valueConverter = converter; + this.columnFamily = columnFamily; + this.columnPrefix = columnPrefix; + if (columnPrefix == null) { + this.columnPrefixBytes = null; + } else { + // Future-proof by ensuring the right column prefix hygiene. + this.columnPrefixBytes = + Bytes.toBytes(Separator.SPACE.encode(columnPrefix)); + } + } + + /** + * @return the column name value + */ + private String getColumnPrefix() { + return columnPrefix; + } + + @Override + public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); + } + + @Override + public byte[] getColumnPrefixBytes(String qualifierPrefix) { + return ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifierPrefix); + } + + @Override + public byte[] getColumnFamilyBytes() { + return columnFamily.getBytes(); + } + + @Override + public byte[] getColumnPrefixInBytes() { + return columnPrefixBytes != null ? columnPrefixBytes.clone() : null; + } + + @Override + public Attribute[] getCombinedAttrsWithAggr(Attribute... attributes) { + return attributes; + } + + @Override + public boolean supplementCellTimeStamp() { + return false; + } + + public ValueConverter getValueConverter() { + return valueConverter; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org