http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java deleted file mode 100644 index 5bacf66..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/AbstractTimelineStorageReader.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.reader; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; -import org.apache.hadoop.yarn.webapp.NotFoundException; - -/** - * The base class for reading timeline data from the HBase storage. This class - * provides basic support to validate and augment reader context. - */ -public abstract class AbstractTimelineStorageReader { - - private final TimelineReaderContext context; - /** - * Used to look up the flow context. - */ - private final AppToFlowTable appToFlowTable = new AppToFlowTable(); - - public AbstractTimelineStorageReader(TimelineReaderContext ctxt) { - context = ctxt; - } - - protected TimelineReaderContext getContext() { - return context; - } - - /** - * Looks up flow context from AppToFlow table. - * - * @param appToFlowRowKey to identify Cluster and App Ids. - * @param clusterId the cluster id. - * @param hbaseConf HBase configuration. - * @param conn HBase Connection. - * @return flow context information. - * @throws IOException if any problem occurs while fetching flow information. - */ - protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey, - String clusterId, Configuration hbaseConf, Connection conn) - throws IOException { - byte[] rowKey = appToFlowRowKey.getRowKey(); - Get get = new Get(rowKey); - Result result = appToFlowTable.getResult(hbaseConf, conn, get); - if (result != null && !result.isEmpty()) { - Object flowName = - AppToFlowColumnPrefix.FLOW_NAME.readResult(result, clusterId); - Object flowRunId = - AppToFlowColumnPrefix.FLOW_RUN_ID.readResult(result, clusterId); - Object userId = - AppToFlowColumnPrefix.USER_ID.readResult(result, clusterId); - if (flowName == null || userId == null || flowRunId == null) { - throw new NotFoundException( - "Unable to find the context flow name, and flow run id, " - + "and user id for clusterId=" + clusterId - + ", appId=" + appToFlowRowKey.getAppId()); - } - return new FlowContext((String)userId, (String)flowName, - ((Number)flowRunId).longValue()); - } else { - throw new NotFoundException( - "Unable to find the context flow name, and flow run id, " - + "and user id for clusterId=" + clusterId - + ", appId=" + appToFlowRowKey.getAppId()); - } - } - - /** - * Sets certain parameters to defaults if the values are not provided. - * - * @param hbaseConf HBase Configuration. - * @param conn HBase Connection. - * @throws IOException if any exception is encountered while setting params. - */ - protected void augmentParams(Configuration hbaseConf, Connection conn) - throws IOException { - defaultAugmentParams(hbaseConf, conn); - } - - /** - * Default behavior for all timeline readers to augment parameters. - * - * @param hbaseConf HBase Configuration. - * @param conn HBase Connection. - * @throws IOException if any exception is encountered while setting params. - */ - final protected void defaultAugmentParams(Configuration hbaseConf, - Connection conn) throws IOException { - // In reality all three should be null or neither should be null - if (context.getFlowName() == null || context.getFlowRunId() == null - || context.getUserId() == null) { - // Get flow context information from AppToFlow table. - AppToFlowRowKey appToFlowRowKey = - new AppToFlowRowKey(context.getAppId()); - FlowContext flowContext = - lookupFlowContext(appToFlowRowKey, context.getClusterId(), hbaseConf, - conn); - context.setFlowName(flowContext.flowName); - context.setFlowRunId(flowContext.flowRunId); - context.setUserId(flowContext.userId); - } - } - - /** - * Validates the required parameters to read the entities. - */ - protected abstract void validateParams(); - - /** - * Encapsulates flow context information. - */ - protected static class FlowContext { - private final String userId; - private final String flowName; - private final Long flowRunId; - - public FlowContext(String user, String flowName, Long flowRunId) { - this.userId = user; - this.flowName = flowName; - this.flowRunId = flowRunId; - } - - protected String getUserId() { - return userId; - } - - protected String getFlowName() { - return flowName; - } - - protected Long getFlowRunId() { - return flowRunId; - } - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java deleted file mode 100644 index 4e8286d..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java +++ /dev/null @@ -1,502 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.reader; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.BinaryComparator; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.FamilyFilter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.FilterList.Operator; -import org.apache.hadoop.hbase.filter.PageFilter; -import org.apache.hadoop.hbase.filter.QualifierFilter; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; - -import com.google.common.base.Preconditions; - -/** - * Timeline entity reader for application entities that are stored in the - * application table. - */ -class ApplicationEntityReader extends GenericEntityReader { - private static final ApplicationTable APPLICATION_TABLE = - new ApplicationTable(); - - public ApplicationEntityReader(TimelineReaderContext ctxt, - TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { - super(ctxt, entityFilters, toRetrieve); - } - - public ApplicationEntityReader(TimelineReaderContext ctxt, - TimelineDataToRetrieve toRetrieve) { - super(ctxt, toRetrieve); - } - - /** - * Uses the {@link ApplicationTable}. - */ - protected BaseTable<?> getTable() { - return APPLICATION_TABLE; - } - - /** - * This method is called only for multiple entity reads. - */ - @Override - protected FilterList constructFilterListBasedOnFilters() throws IOException { - // Filters here cannot be null for multiple entity reads as they are set in - // augmentParams if null. - TimelineEntityFilters filters = getFilters(); - FilterList listBasedOnFilters = new FilterList(); - // Create filter list based on created time range and add it to - // listBasedOnFilters. - long createdTimeBegin = filters.getCreatedTimeBegin(); - long createdTimeEnd = filters.getCreatedTimeEnd(); - if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createSingleColValueFiltersByRange( - ApplicationColumn.CREATED_TIME, createdTimeBegin, createdTimeEnd)); - } - // Create filter list based on metric filters and add it to - // listBasedOnFilters. - TimelineFilterList metricFilters = filters.getMetricFilters(); - if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createHBaseFilterList( - ApplicationColumnPrefix.METRIC, metricFilters)); - } - // Create filter list based on config filters and add it to - // listBasedOnFilters. - TimelineFilterList configFilters = filters.getConfigFilters(); - if (configFilters != null && !configFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createHBaseFilterList( - ApplicationColumnPrefix.CONFIG, configFilters)); - } - // Create filter list based on info filters and add it to listBasedOnFilters - TimelineFilterList infoFilters = filters.getInfoFilters(); - if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createHBaseFilterList( - ApplicationColumnPrefix.INFO, infoFilters)); - } - return listBasedOnFilters; - } - - /** - * Add {@link QualifierFilter} filters to filter list for each column of - * application table. - * - * @param list filter list to which qualifier filters have to be added. - */ - @Override - protected void updateFixedColumns(FilterList list) { - for (ApplicationColumn column : ApplicationColumn.values()) { - list.addFilter(new QualifierFilter(CompareOp.EQUAL, - new BinaryComparator(column.getColumnQualifierBytes()))); - } - } - - /** - * Creates a filter list which indicates that only some of the column - * qualifiers in the info column family will be returned in result. - * - * @return filter list. - * @throws IOException if any problem occurs while creating filter list. - */ - private FilterList createFilterListForColsOfInfoFamily() - throws IOException { - FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE); - // Add filters for each column in entity table. - updateFixedColumns(infoFamilyColsFilter); - EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); - // If INFO field has to be retrieved, add a filter for fetching columns - // with INFO column prefix. - if (hasField(fieldsToRetrieve, Field.INFO)) { - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.EQUAL, ApplicationColumnPrefix.INFO)); - } - TimelineFilterList relatesTo = getFilters().getRelatesTo(); - if (hasField(fieldsToRetrieve, Field.RELATES_TO)) { - // If RELATES_TO field has to be retrieved, add a filter for fetching - // columns with RELATES_TO column prefix. - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.EQUAL, ApplicationColumnPrefix.RELATES_TO)); - } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) { - // Even if fields to retrieve does not contain RELATES_TO, we still - // need to have a filter to fetch some of the column qualifiers if - // relatesTo filters are specified. relatesTo filters will then be - // matched after fetching rows from HBase. - Set<String> relatesToCols = - TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo); - infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( - ApplicationColumnPrefix.RELATES_TO, relatesToCols)); - } - TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo(); - if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { - // If IS_RELATED_TO field has to be retrieved, add a filter for fetching - // columns with IS_RELATED_TO column prefix. - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.EQUAL, ApplicationColumnPrefix.IS_RELATED_TO)); - } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) { - // Even if fields to retrieve does not contain IS_RELATED_TO, we still - // need to have a filter to fetch some of the column qualifiers if - // isRelatedTo filters are specified. isRelatedTo filters will then be - // matched after fetching rows from HBase. - Set<String> isRelatedToCols = - TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo); - infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( - ApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols)); - } - TimelineFilterList eventFilters = getFilters().getEventFilters(); - if (hasField(fieldsToRetrieve, Field.EVENTS)) { - // If EVENTS field has to be retrieved, add a filter for fetching columns - // with EVENT column prefix. - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.EQUAL, ApplicationColumnPrefix.EVENT)); - } else if (eventFilters != null && !eventFilters.getFilterList().isEmpty()){ - // Even if fields to retrieve does not contain EVENTS, we still need to - // have a filter to fetch some of the column qualifiers on the basis of - // event filters specified. Event filters will then be matched after - // fetching rows from HBase. - Set<String> eventCols = - TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters); - infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( - ApplicationColumnPrefix.EVENT, eventCols)); - } - return infoFamilyColsFilter; - } - - /** - * Exclude column prefixes via filters which are not required(based on fields - * to retrieve) from info column family. These filters are added to filter - * list which contains a filter for getting info column family. - * - * @param infoColFamilyList filter list for info column family. - */ - private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) { - EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); - // Events not required. - if (!hasField(fieldsToRetrieve, Field.EVENTS)) { - infoColFamilyList.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.NOT_EQUAL, ApplicationColumnPrefix.EVENT)); - } - // info not required. - if (!hasField(fieldsToRetrieve, Field.INFO)) { - infoColFamilyList.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.NOT_EQUAL, ApplicationColumnPrefix.INFO)); - } - // is related to not required. - if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { - infoColFamilyList.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.NOT_EQUAL, ApplicationColumnPrefix.IS_RELATED_TO)); - } - // relates to not required. - if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) { - infoColFamilyList.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.NOT_EQUAL, ApplicationColumnPrefix.RELATES_TO)); - } - } - - /** - * Updates filter list based on fields for confs and metrics to retrieve. - * - * @param listBasedOnFields filter list based on fields. - * @throws IOException if any problem occurs while updating filter list. - */ - private void updateFilterForConfsAndMetricsToRetrieve( - FilterList listBasedOnFields) throws IOException { - TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); - // Please note that if confsToRetrieve is specified, we would have added - // CONFS to fields to retrieve in augmentParams() even if not specified. - if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) { - // Create a filter list for configs. - listBasedOnFields.addFilter(TimelineFilterUtils. - createFilterForConfsOrMetricsToRetrieve( - dataToRetrieve.getConfsToRetrieve(), - ApplicationColumnFamily.CONFIGS, ApplicationColumnPrefix.CONFIG)); - } - - // Please note that if metricsToRetrieve is specified, we would have added - // METRICS to fields to retrieve in augmentParams() even if not specified. - if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) { - // Create a filter list for metrics. - listBasedOnFields.addFilter(TimelineFilterUtils. - createFilterForConfsOrMetricsToRetrieve( - dataToRetrieve.getMetricsToRetrieve(), - ApplicationColumnFamily.METRICS, ApplicationColumnPrefix.METRIC)); - } - } - - @Override - protected FilterList constructFilterListBasedOnFields() throws IOException { - if (!needCreateFilterListBasedOnFields()) { - // Fetch all the columns. No need of a filter. - return null; - } - FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE); - FilterList infoColFamilyList = new FilterList(); - // By default fetch everything in INFO column family. - FamilyFilter infoColumnFamily = - new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(ApplicationColumnFamily.INFO.getBytes())); - infoColFamilyList.addFilter(infoColumnFamily); - if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) { - // We can fetch only some of the columns from info family. - infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily()); - } else { - // Exclude column prefixes in info column family which are not required - // based on fields to retrieve. - excludeFieldsFromInfoColFamily(infoColFamilyList); - } - listBasedOnFields.addFilter(infoColFamilyList); - - updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields); - return listBasedOnFields; - } - - @Override - protected Result getResult(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException { - TimelineReaderContext context = getContext(); - ApplicationRowKey applicationRowKey = - new ApplicationRowKey(context.getClusterId(), context.getUserId(), - context.getFlowName(), context.getFlowRunId(), context.getAppId()); - byte[] rowKey = applicationRowKey.getRowKey(); - Get get = new Get(rowKey); - get.setMaxVersions(getDataToRetrieve().getMetricsLimit()); - if (filterList != null && !filterList.getFilters().isEmpty()) { - get.setFilter(filterList); - } - return getTable().getResult(hbaseConf, conn, get); - } - - @Override - protected void validateParams() { - Preconditions.checkNotNull(getContext(), "context shouldn't be null"); - Preconditions.checkNotNull( - getDataToRetrieve(), "data to retrieve shouldn't be null"); - Preconditions.checkNotNull(getContext().getClusterId(), - "clusterId shouldn't be null"); - Preconditions.checkNotNull(getContext().getEntityType(), - "entityType shouldn't be null"); - if (isSingleEntityRead()) { - Preconditions.checkNotNull(getContext().getAppId(), - "appId shouldn't be null"); - } else { - Preconditions.checkNotNull(getContext().getUserId(), - "userId shouldn't be null"); - Preconditions.checkNotNull(getContext().getFlowName(), - "flowName shouldn't be null"); - } - } - - @Override - protected void augmentParams(Configuration hbaseConf, Connection conn) - throws IOException { - if (isSingleEntityRead()) { - // Get flow context information from AppToFlow table. - defaultAugmentParams(hbaseConf, conn); - } - // Add configs/metrics to fields to retrieve if confsToRetrieve and/or - // metricsToRetrieve are specified. - getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve(); - if (!isSingleEntityRead()) { - createFiltersIfNull(); - } - } - - @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException { - Scan scan = new Scan(); - TimelineReaderContext context = getContext(); - RowKeyPrefix<ApplicationRowKey> applicationRowKeyPrefix = null; - - // Whether or not flowRunID is null doesn't matter, the - // ApplicationRowKeyPrefix will do the right thing. - // default mode, will always scans from beginning of entity type. - if (getFilters().getFromId() == null) { - applicationRowKeyPrefix = new ApplicationRowKeyPrefix( - context.getClusterId(), context.getUserId(), context.getFlowName(), - context.getFlowRunId()); - scan.setRowPrefixFilter(applicationRowKeyPrefix.getRowKeyPrefix()); - } else { - Long flowRunId = context.getFlowRunId(); - if (flowRunId == null) { - AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey( - getFilters().getFromId()); - FlowContext flowContext = lookupFlowContext(appToFlowRowKey, - context.getClusterId(), hbaseConf, conn); - flowRunId = flowContext.getFlowRunId(); - } - - ApplicationRowKey applicationRowKey = - new ApplicationRowKey(context.getClusterId(), context.getUserId(), - context.getFlowName(), flowRunId, getFilters().getFromId()); - - // set start row - scan.setStartRow(applicationRowKey.getRowKey()); - - // get the bytes for stop row - applicationRowKeyPrefix = new ApplicationRowKeyPrefix( - context.getClusterId(), context.getUserId(), context.getFlowName(), - context.getFlowRunId()); - - // set stop row - scan.setStopRow( - HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix( - applicationRowKeyPrefix.getRowKeyPrefix())); - } - - FilterList newList = new FilterList(); - newList.addFilter(new PageFilter(getFilters().getLimit())); - if (filterList != null && !filterList.getFilters().isEmpty()) { - newList.addFilter(filterList); - } - scan.setFilter(newList); - scan.setMaxVersions(getDataToRetrieve().getMetricsLimit()); - return getTable().getResultScanner(hbaseConf, conn, scan); - } - - @Override - protected TimelineEntity parseEntity(Result result) throws IOException { - if (result == null || result.isEmpty()) { - return null; - } - TimelineEntity entity = new TimelineEntity(); - entity.setType(TimelineEntityType.YARN_APPLICATION.toString()); - String entityId = ApplicationColumn.ID.readResult(result).toString(); - entity.setId(entityId); - - TimelineEntityFilters filters = getFilters(); - // fetch created time - Long createdTime = (Long) ApplicationColumn.CREATED_TIME.readResult(result); - entity.setCreatedTime(createdTime); - - EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); - // fetch is related to entities and match isRelatedTo filter. If isRelatedTo - // filters do not match, entity would be dropped. We have to match filters - // locally as relevant HBase filters to filter out rows on the basis of - // isRelatedTo are not set in HBase scan. - boolean checkIsRelatedTo = - !isSingleEntityRead() && filters.getIsRelatedTo() != null && - filters.getIsRelatedTo().getFilterList().size() > 0; - if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) { - readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO, - true); - if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity, - filters.getIsRelatedTo())) { - return null; - } - if (!hasField(fieldsToRetrieve, - Field.IS_RELATED_TO)) { - entity.getIsRelatedToEntities().clear(); - } - } - - // fetch relates to entities and match relatesTo filter. If relatesTo - // filters do not match, entity would be dropped. We have to match filters - // locally as relevant HBase filters to filter out rows on the basis of - // relatesTo are not set in HBase scan. - boolean checkRelatesTo = - !isSingleEntityRead() && filters.getRelatesTo() != null && - filters.getRelatesTo().getFilterList().size() > 0; - if (hasField(fieldsToRetrieve, Field.RELATES_TO) || - checkRelatesTo) { - readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO, - false); - if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity, - filters.getRelatesTo())) { - return null; - } - if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) { - entity.getRelatesToEntities().clear(); - } - } - - // fetch info if fieldsToRetrieve contains INFO or ALL. - if (hasField(fieldsToRetrieve, Field.INFO)) { - readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false); - } - - // fetch configs if fieldsToRetrieve contains CONFIGS or ALL. - if (hasField(fieldsToRetrieve, Field.CONFIGS)) { - readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true); - } - - // fetch events and match event filters if they exist. If event filters do - // not match, entity would be dropped. We have to match filters locally - // as relevant HBase filters to filter out rows on the basis of events - // are not set in HBase scan. - boolean checkEvents = - !isSingleEntityRead() && filters.getEventFilters() != null && - filters.getEventFilters().getFilterList().size() > 0; - if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) { - readEvents(entity, result, ApplicationColumnPrefix.EVENT); - if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity, - filters.getEventFilters())) { - return null; - } - if (!hasField(fieldsToRetrieve, Field.EVENTS)) { - entity.getEvents().clear(); - } - } - - // fetch metrics if fieldsToRetrieve contains METRICS or ALL. - if (hasField(fieldsToRetrieve, Field.METRICS)) { - readMetrics(entity, result, ApplicationColumnPrefix.METRIC); - } - return entity; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java deleted file mode 100644 index fd85878..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/EntityTypeReader.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.reader; - -import com.google.common.base.Preconditions; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; -import org.apache.hadoop.hbase.filter.KeyOnlyFilter; -import org.apache.hadoop.hbase.filter.PageFilter; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Set; -import java.util.TreeSet; - -/** - * Timeline entity reader for listing all available entity types given one - * reader context. Right now only supports listing all entity types within one - * YARN application. - */ -public final class EntityTypeReader extends AbstractTimelineStorageReader { - - private static final Log LOG = LogFactory.getLog(EntityTypeReader.class); - private static final EntityTable ENTITY_TABLE = new EntityTable(); - - public EntityTypeReader(TimelineReaderContext context) { - super(context); - } - - /** - * Reads a set of timeline entity types from the HBase storage for the given - * context. - * - * @param hbaseConf HBase Configuration. - * @param conn HBase Connection. - * @return a set of <cite>TimelineEntity</cite> objects, with only type field - * set. - * @throws IOException if any exception is encountered while reading entities. - */ - public Set<String> readEntityTypes(Configuration hbaseConf, - Connection conn) throws IOException { - - validateParams(); - augmentParams(hbaseConf, conn); - - Set<String> types = new TreeSet<>(); - TimelineReaderContext context = getContext(); - EntityRowKeyPrefix prefix = new EntityRowKeyPrefix(context.getClusterId(), - context.getUserId(), context.getFlowName(), context.getFlowRunId(), - context.getAppId()); - byte[] currRowKey = prefix.getRowKeyPrefix(); - byte[] nextRowKey = prefix.getRowKeyPrefix(); - nextRowKey[nextRowKey.length - 1]++; - - FilterList typeFilterList = new FilterList(); - typeFilterList.addFilter(new FirstKeyOnlyFilter()); - typeFilterList.addFilter(new KeyOnlyFilter()); - typeFilterList.addFilter(new PageFilter(1)); - if (LOG.isDebugEnabled()) { - LOG.debug("FilterList created for scan is - " + typeFilterList); - } - - int counter = 0; - while (true) { - try (ResultScanner results - = getResult(hbaseConf, conn, typeFilterList, currRowKey, nextRowKey)) - { - TimelineEntity entity = parseEntityForType(results.next()); - if (entity == null) { - break; - } - ++counter; - if (!types.add(entity.getType())) { - LOG.warn("Failed to add type " + entity.getType() - + " to the result set because there is a duplicated copy. "); - } - String currType = entity.getType(); - if (LOG.isDebugEnabled()) { - LOG.debug("Current row key: " + Arrays.toString(currRowKey)); - LOG.debug("New entity type discovered: " + currType); - } - currRowKey = getNextRowKey(prefix.getRowKeyPrefix(), currType); - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("Scanned " + counter + "records for " - + types.size() + "types"); - } - return types; - } - - @Override - protected void validateParams() { - Preconditions.checkNotNull(getContext(), "context shouldn't be null"); - Preconditions.checkNotNull(getContext().getClusterId(), - "clusterId shouldn't be null"); - Preconditions.checkNotNull(getContext().getAppId(), - "appId shouldn't be null"); - } - - /** - * Gets the possibly next row key prefix given current prefix and type. - * - * @param currRowKeyPrefix The current prefix that contains user, cluster, - * flow, run, and application id. - * @param entityType Current entity type. - * @return A new prefix for the possibly immediately next row key. - */ - private static byte[] getNextRowKey(byte[] currRowKeyPrefix, - String entityType) { - if (currRowKeyPrefix == null || entityType == null) { - return null; - } - - byte[] entityTypeEncoded = Separator.QUALIFIERS.join( - Separator.encode(entityType, Separator.SPACE, Separator.TAB, - Separator.QUALIFIERS), - Separator.EMPTY_BYTES); - - byte[] currRowKey - = new byte[currRowKeyPrefix.length + entityTypeEncoded.length]; - System.arraycopy(currRowKeyPrefix, 0, currRowKey, 0, - currRowKeyPrefix.length); - System.arraycopy(entityTypeEncoded, 0, currRowKey, currRowKeyPrefix.length, - entityTypeEncoded.length); - - return HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix( - currRowKey); - } - - private ResultScanner getResult(Configuration hbaseConf, Connection conn, - FilterList filterList, byte[] startPrefix, byte[] endPrefix) - throws IOException { - Scan scan = new Scan(startPrefix, endPrefix); - scan.setFilter(filterList); - scan.setSmall(true); - return ENTITY_TABLE.getResultScanner(hbaseConf, conn, scan); - } - - private TimelineEntity parseEntityForType(Result result) - throws IOException { - if (result == null || result.isEmpty()) { - return null; - } - TimelineEntity entity = new TimelineEntity(); - EntityRowKey newRowKey = EntityRowKey.parseRowKey(result.getRow()); - entity.setType(newRowKey.getEntityType()); - return entity; - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java deleted file mode 100644 index c741d0e..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage.reader; - -import java.io.IOException; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.PageFilter; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; - -import com.google.common.base.Preconditions; - -/** - * Timeline entity reader for flow activity entities that are stored in the - * flow activity table. - */ -class FlowActivityEntityReader extends TimelineEntityReader { - private static final FlowActivityTable FLOW_ACTIVITY_TABLE = - new FlowActivityTable(); - - /** - * Used to convert Long key components to and from storage format. - */ - private final KeyConverter<Long> longKeyConverter = new LongKeyConverter(); - - - public FlowActivityEntityReader(TimelineReaderContext ctxt, - TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { - super(ctxt, entityFilters, toRetrieve); - } - - public FlowActivityEntityReader(TimelineReaderContext ctxt, - TimelineDataToRetrieve toRetrieve) { - super(ctxt, toRetrieve); - } - - /** - * Uses the {@link FlowActivityTable}. - */ - @Override - protected BaseTable<?> getTable() { - return FLOW_ACTIVITY_TABLE; - } - - @Override - protected void validateParams() { - Preconditions.checkNotNull(getContext().getClusterId(), - "clusterId shouldn't be null"); - } - - @Override - protected void augmentParams(Configuration hbaseConf, Connection conn) - throws IOException { - createFiltersIfNull(); - } - - @Override - protected FilterList constructFilterListBasedOnFilters() throws IOException { - return null; - } - - @Override - protected FilterList constructFilterListBasedOnFields() { - return null; - } - - @Override - protected Result getResult(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException { - throw new UnsupportedOperationException( - "we don't support a single entity query"); - } - - @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException { - Scan scan = new Scan(); - String clusterId = getContext().getClusterId(); - if (getFilters().getCreatedTimeBegin() == 0L && - getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) { - // All records have to be chosen. - scan.setRowPrefixFilter(new FlowActivityRowKeyPrefix(clusterId) - .getRowKeyPrefix()); - } else { - scan.setStartRow(new FlowActivityRowKeyPrefix(clusterId, getFilters() - .getCreatedTimeEnd()).getRowKeyPrefix()); - scan.setStopRow(new FlowActivityRowKeyPrefix(clusterId, (getFilters() - .getCreatedTimeBegin() <= 0 ? 0 - : (getFilters().getCreatedTimeBegin() - 1))).getRowKeyPrefix()); - } - // use the page filter to limit the result to the page size - // the scanner may still return more than the limit; therefore we need to - // read the right number as we iterate - scan.setFilter(new PageFilter(getFilters().getLimit())); - return getTable().getResultScanner(hbaseConf, conn, scan); - } - - @Override - protected TimelineEntity parseEntity(Result result) throws IOException { - FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow()); - - Long time = rowKey.getDayTimestamp(); - String user = rowKey.getUserId(); - String flowName = rowKey.getFlowName(); - - FlowActivityEntity flowActivity = new FlowActivityEntity( - getContext().getClusterId(), time, user, flowName); - // set the id - flowActivity.setId(flowActivity.getId()); - // get the list of run ids along with the version that are associated with - // this flow on this day - Map<Long, Object> runIdsMap = - FlowActivityColumnPrefix.RUN_ID.readResults(result, longKeyConverter); - for (Map.Entry<Long, Object> e : runIdsMap.entrySet()) { - Long runId = e.getKey(); - String version = (String)e.getValue(); - FlowRunEntity flowRun = new FlowRunEntity(); - flowRun.setUser(user); - flowRun.setName(flowName); - flowRun.setRunId(runId); - flowRun.setVersion(version); - // set the id - flowRun.setId(flowRun.getId()); - flowActivity.addFlowRun(flowRun); - } - - return flowActivity; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92089c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java deleted file mode 100644 index cedf96a..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java +++ /dev/null @@ -1,290 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.reader; - -import java.io.IOException; -import java.util.EnumSet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.BinaryComparator; -import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.FamilyFilter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.FilterList.Operator; -import org.apache.hadoop.hbase.filter.PageFilter; -import org.apache.hadoop.hbase.filter.QualifierFilter; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKeyPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; -import org.apache.hadoop.yarn.webapp.BadRequestException; - -import com.google.common.base.Preconditions; - -/** - * Timeline entity reader for flow run entities that are stored in the flow run - * table. - */ -class FlowRunEntityReader extends TimelineEntityReader { - private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable(); - - public FlowRunEntityReader(TimelineReaderContext ctxt, - TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { - super(ctxt, entityFilters, toRetrieve); - } - - public FlowRunEntityReader(TimelineReaderContext ctxt, - TimelineDataToRetrieve toRetrieve) { - super(ctxt, toRetrieve); - } - - /** - * Uses the {@link FlowRunTable}. - */ - @Override - protected BaseTable<?> getTable() { - return FLOW_RUN_TABLE; - } - - @Override - protected void validateParams() { - Preconditions.checkNotNull(getContext(), "context shouldn't be null"); - Preconditions.checkNotNull(getDataToRetrieve(), - "data to retrieve shouldn't be null"); - Preconditions.checkNotNull(getContext().getClusterId(), - "clusterId shouldn't be null"); - Preconditions.checkNotNull(getContext().getUserId(), - "userId shouldn't be null"); - Preconditions.checkNotNull(getContext().getFlowName(), - "flowName shouldn't be null"); - if (isSingleEntityRead()) { - Preconditions.checkNotNull(getContext().getFlowRunId(), - "flowRunId shouldn't be null"); - } - EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); - if (!isSingleEntityRead() && fieldsToRetrieve != null) { - for (Field field : fieldsToRetrieve) { - if (field != Field.ALL && field != Field.METRICS) { - throw new BadRequestException("Invalid field " + field - + " specified while querying flow runs."); - } - } - } - } - - @Override - protected void augmentParams(Configuration hbaseConf, Connection conn) { - // Add metrics to fields to retrieve if metricsToRetrieve is specified. - getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve(); - if (!isSingleEntityRead()) { - createFiltersIfNull(); - } - } - - protected FilterList constructFilterListBasedOnFilters() throws IOException { - FilterList listBasedOnFilters = new FilterList(); - // Filter based on created time range. - Long createdTimeBegin = getFilters().getCreatedTimeBegin(); - Long createdTimeEnd = getFilters().getCreatedTimeEnd(); - if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) { - listBasedOnFilters.addFilter(TimelineFilterUtils - .createSingleColValueFiltersByRange(FlowRunColumn.MIN_START_TIME, - createdTimeBegin, createdTimeEnd)); - } - // Filter based on metric filters. - TimelineFilterList metricFilters = getFilters().getMetricFilters(); - if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( - FlowRunColumnPrefix.METRIC, metricFilters)); - } - return listBasedOnFilters; - } - - /** - * Add {@link QualifierFilter} filters to filter list for each column of flow - * run table. - * - * @return filter list to which qualifier filters have been added. - */ - private FilterList updateFixedColumns() { - FilterList columnsList = new FilterList(Operator.MUST_PASS_ONE); - for (FlowRunColumn column : FlowRunColumn.values()) { - columnsList.addFilter(new QualifierFilter(CompareOp.EQUAL, - new BinaryComparator(column.getColumnQualifierBytes()))); - } - return columnsList; - } - - @Override - protected FilterList constructFilterListBasedOnFields() throws IOException { - FilterList list = new FilterList(Operator.MUST_PASS_ONE); - // By default fetch everything in INFO column family. - FamilyFilter infoColumnFamily = - new FamilyFilter(CompareOp.EQUAL, new BinaryComparator( - FlowRunColumnFamily.INFO.getBytes())); - TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); - // If multiple entities have to be retrieved, check if metrics have to be - // retrieved and if not, add a filter so that metrics can be excluded. - // Metrics are always returned if we are reading a single entity. - if (!isSingleEntityRead() - && !hasField(dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) { - FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE); - infoColFamilyList.addFilter(infoColumnFamily); - infoColFamilyList.addFilter(new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator(FlowRunColumnPrefix.METRIC - .getColumnPrefixBytes("")))); - list.addFilter(infoColFamilyList); - } else { - // Check if metricsToRetrieve are specified and if they are, create a - // filter list for info column family by adding flow run tables columns - // and a list for metrics to retrieve. Pls note that fieldsToRetrieve - // will have METRICS added to it if metricsToRetrieve are specified - // (in augmentParams()). - TimelineFilterList metricsToRetrieve = - dataToRetrieve.getMetricsToRetrieve(); - if (metricsToRetrieve != null - && !metricsToRetrieve.getFilterList().isEmpty()) { - FilterList infoColFamilyList = new FilterList(); - infoColFamilyList.addFilter(infoColumnFamily); - FilterList columnsList = updateFixedColumns(); - columnsList.addFilter(TimelineFilterUtils.createHBaseFilterList( - FlowRunColumnPrefix.METRIC, metricsToRetrieve)); - infoColFamilyList.addFilter(columnsList); - list.addFilter(infoColFamilyList); - } - } - return list; - } - - @Override - protected Result getResult(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException { - TimelineReaderContext context = getContext(); - FlowRunRowKey flowRunRowKey = - new FlowRunRowKey(context.getClusterId(), context.getUserId(), - context.getFlowName(), context.getFlowRunId()); - byte[] rowKey = flowRunRowKey.getRowKey(); - Get get = new Get(rowKey); - get.setMaxVersions(Integer.MAX_VALUE); - if (filterList != null && !filterList.getFilters().isEmpty()) { - get.setFilter(filterList); - } - return getTable().getResult(hbaseConf, conn, get); - } - - @Override - protected ResultScanner getResults(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException { - Scan scan = new Scan(); - TimelineReaderContext context = getContext(); - RowKeyPrefix<FlowRunRowKey> flowRunRowKeyPrefix = null; - if (getFilters().getFromId() == null) { - flowRunRowKeyPrefix = new FlowRunRowKeyPrefix(context.getClusterId(), - context.getUserId(), context.getFlowName()); - scan.setRowPrefixFilter(flowRunRowKeyPrefix.getRowKeyPrefix()); - } else { - - FlowRunRowKey flowRunRowKey = - new FlowRunRowKey(context.getClusterId(), context.getUserId(), - context.getFlowName(), Long.parseLong(getFilters().getFromId())); - - // set start row - scan.setStartRow(flowRunRowKey.getRowKey()); - - // get the bytes for stop row - flowRunRowKeyPrefix = new FlowRunRowKeyPrefix(context.getClusterId(), - context.getUserId(), context.getFlowName()); - - // set stop row - scan.setStopRow( - HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix( - flowRunRowKeyPrefix.getRowKeyPrefix())); - } - - FilterList newList = new FilterList(); - newList.addFilter(new PageFilter(getFilters().getLimit())); - if (filterList != null && !filterList.getFilters().isEmpty()) { - newList.addFilter(filterList); - } - scan.setFilter(newList); - scan.setMaxVersions(Integer.MAX_VALUE); - return getTable().getResultScanner(hbaseConf, conn, scan); - } - - @Override - protected TimelineEntity parseEntity(Result result) throws IOException { - TimelineReaderContext context = getContext(); - FlowRunEntity flowRun = new FlowRunEntity(); - flowRun.setUser(context.getUserId()); - flowRun.setName(context.getFlowName()); - if (isSingleEntityRead()) { - flowRun.setRunId(context.getFlowRunId()); - } else { - FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow()); - flowRun.setRunId(rowKey.getFlowRunId()); - } - - // read the start time - Long startTime = (Long) FlowRunColumn.MIN_START_TIME.readResult(result); - if (startTime != null) { - flowRun.setStartTime(startTime.longValue()); - } - - // read the end time if available - Long endTime = (Long) FlowRunColumn.MAX_END_TIME.readResult(result); - if (endTime != null) { - flowRun.setMaxEndTime(endTime.longValue()); - } - - // read the flow version - String version = (String) FlowRunColumn.FLOW_VERSION.readResult(result); - if (version != null) { - flowRun.setVersion(version); - } - - // read metrics if its a single entity query or if METRICS are part of - // fieldsToRetrieve. - if (isSingleEntityRead() - || hasField(getDataToRetrieve().getFieldsToRetrieve(), Field.METRICS)) { - readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC); - } - - // set the id - flowRun.setId(flowRun.getId()); - return flowRun; - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org