http://git-wip-us.apache.org/repos/asf/hadoop/blob/61737325/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java new file mode 100644 index 0000000..181ec81 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java @@ -0,0 +1,383 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for application entities that are stored in the + * application table. + */ +class ApplicationEntityReader extends GenericEntityReader { + private static final ApplicationTable APPLICATION_TABLE = + new ApplicationTable(); + + public ApplicationEntityReader(String userId, String clusterId, + String flowName, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, + Map<String, Object> infoFilters, Map<String, String> configFilters, + Set<String> metricFilters, Set<String> eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, + EnumSet<Field> fieldsToRetrieve) { + super(userId, clusterId, flowName, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve, + true); + } + + public ApplicationEntityReader(String userId, String clusterId, + String flowName, Long flowRunId, String appId, String entityType, + String entityId, TimelineFilterList confsToRetrieve, + TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) { + super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId, + confsToRetrieve, metricsToRetrieve, fieldsToRetrieve); + } + + /** + * Uses the {@link ApplicationTable}. + */ + protected BaseTable<?> getTable() { + return APPLICATION_TABLE; + } + + @Override + protected FilterList constructFilterListBasedOnFields() { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + // Fetch all the columns. + if (fieldsToRetrieve.contains(Field.ALL) && + (confsToRetrieve == null || + confsToRetrieve.getFilterList().isEmpty()) && + (metricsToRetrieve == null || + metricsToRetrieve.getFilterList().isEmpty())) { + return list; + } + FilterList infoColFamilyList = new FilterList(); + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(ApplicationColumnFamily.INFO.getBytes())); + infoColFamilyList.addFilter(infoColumnFamily); + // Events not required. + if (!fieldsToRetrieve.contains(Field.EVENTS) && + !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + ApplicationColumnPrefix.EVENT.getColumnPrefixBytes("")))); + } + // info not required. + if (!fieldsToRetrieve.contains(Field.INFO) && + !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + ApplicationColumnPrefix.INFO.getColumnPrefixBytes("")))); + } + // is releated to not required. + if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) && + !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + ApplicationColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes("")))); + } + // relates to not required. + if (!fieldsToRetrieve.contains(Field.RELATES_TO) && + !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + ApplicationColumnPrefix.RELATES_TO.getColumnPrefixBytes("")))); + } + list.addFilter(infoColFamilyList); + if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) || + (confsToRetrieve != null && + !confsToRetrieve.getFilterList().isEmpty())) { + FilterList filterCfg = + new FilterList(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(ApplicationColumnFamily.CONFIGS.getBytes()))); + if (confsToRetrieve != null && + !confsToRetrieve.getFilterList().isEmpty()) { + filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList( + ApplicationColumnPrefix.CONFIG, confsToRetrieve)); + } + list.addFilter(filterCfg); + } + if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) || + (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty())) { + FilterList filterMetrics = + new FilterList(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(ApplicationColumnFamily.METRICS.getBytes()))); + if (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList( + ApplicationColumnPrefix.METRIC, metricsToRetrieve)); + } + list.addFilter(filterMetrics); + } + return list; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { + byte[] rowKey = + ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId, + appId); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } + return table.getResult(hbaseConf, conn, get); + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); + if (singleEntityRead) { + Preconditions.checkNotNull(appId, "appId shouldn't be null"); + } else { + Preconditions.checkNotNull(userId, "userId shouldn't be null"); + Preconditions.checkNotNull(flowName, "flowName shouldn't be null"); + } + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) + throws IOException { + if (singleEntityRead) { + if (flowName == null || flowRunId == null || userId == null) { + FlowContext context = + lookupFlowContext(clusterId, appId, hbaseConf, conn); + flowName = context.flowName; + flowRunId = context.flowRunId; + userId = context.userId; + } + } + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.noneOf(Field.class); + } + if (!fieldsToRetrieve.contains(Field.CONFIGS) && + confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.CONFIGS); + } + if (!fieldsToRetrieve.contains(Field.METRICS) && + metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.METRICS); + } + if (!singleEntityRead) { + if (limit == null || limit < 0) { + limit = TimelineReader.DEFAULT_LIMIT; + } + if (createdTimeBegin == null) { + createdTimeBegin = DEFAULT_BEGIN_TIME; + } + if (createdTimeEnd == null) { + createdTimeEnd = DEFAULT_END_TIME; + } + if (modifiedTimeBegin == null) { + modifiedTimeBegin = DEFAULT_BEGIN_TIME; + } + if (modifiedTimeEnd == null) { + modifiedTimeEnd = DEFAULT_END_TIME; + } + } + } + + @Override + protected ResultScanner getResults(Configuration hbaseConf, + Connection conn, FilterList filterList) throws IOException { + Scan scan = new Scan(); + if (flowRunId != null) { + scan.setRowPrefixFilter(ApplicationRowKey. + getRowKeyPrefix(clusterId, userId, flowName, flowRunId)); + } else { + scan.setRowPrefixFilter(ApplicationRowKey. + getRowKeyPrefix(clusterId, userId, flowName)); + } + FilterList newList = new FilterList(); + newList.addFilter(new PageFilter(limit)); + if (filterList != null && !filterList.getFilters().isEmpty()) { + newList.addFilter(filterList); + } + scan.setFilter(newList); + return table.getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + if (result == null || result.isEmpty()) { + return null; + } + TimelineEntity entity = new TimelineEntity(); + entity.setType(TimelineEntityType.YARN_APPLICATION.toString()); + String entityId = ApplicationColumn.ID.readResult(result).toString(); + entity.setId(entityId); + + // fetch created time + Number createdTime = + (Number)ApplicationColumn.CREATED_TIME.readResult(result); + entity.setCreatedTime(createdTime.longValue()); + if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin || + entity.getCreatedTime() > createdTimeEnd)) { + return null; + } + + // fetch modified time + Number modifiedTime = + (Number)ApplicationColumn.MODIFIED_TIME.readResult(result); + entity.setModifiedTime(modifiedTime.longValue()); + if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin || + entity.getModifiedTime() > modifiedTimeEnd)) { + return null; + } + + // fetch is related to entities + boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { + readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO, + true); + if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations( + entity.getIsRelatedToEntities(), isRelatedTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { + entity.getIsRelatedToEntities().clear(); + } + } + + // fetch relates to entities + boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { + readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO, + false); + if (checkRelatesTo && !TimelineStorageUtils.matchRelations( + entity.getRelatesToEntities(), relatesTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.RELATES_TO)) { + entity.getRelatesToEntities().clear(); + } + } + + // fetch info + boolean checkInfo = infoFilters != null && infoFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.INFO) || checkInfo) { + readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false); + if (checkInfo && + !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.INFO)) { + entity.getInfo().clear(); + } + } + + // fetch configs + boolean checkConfigs = configFilters != null && configFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { + readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true); + if (checkConfigs && !TimelineStorageUtils.matchFilters( + entity.getConfigs(), configFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.CONFIGS)) { + entity.getConfigs().clear(); + } + } + + // fetch events + boolean checkEvents = eventFilters != null && eventFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { + readEvents(entity, result, true); + if (checkEvents && !TimelineStorageUtils.matchEventFilters( + entity.getEvents(), eventFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.EVENTS)) { + entity.getEvents().clear(); + } + } + + // fetch metrics + boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { + readMetrics(entity, result, ApplicationColumnPrefix.METRIC); + if (checkMetrics && !TimelineStorageUtils.matchMetricFilters( + entity.getMetrics(), metricFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.METRICS)) { + entity.getMetrics().clear(); + } + } + return entity; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61737325/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java new file mode 100644 index 0000000..52ceef8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for flow activity entities that are stored in the + * flow activity table. + */ +class FlowActivityEntityReader extends TimelineEntityReader { + private static final FlowActivityTable FLOW_ACTIVITY_TABLE = + new FlowActivityTable(); + + public FlowActivityEntityReader(String userId, String clusterId, + String flowName, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, + Map<String, Object> infoFilters, Map<String, String> configFilters, + Set<String> metricFilters, Set<String> eventFilters, + EnumSet<Field> fieldsToRetrieve) { + super(userId, clusterId, flowName, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, null, null, fieldsToRetrieve, true); + } + + public FlowActivityEntityReader(String userId, String clusterId, + String flowName, Long flowRunId, String appId, String entityType, + String entityId, EnumSet<Field> fieldsToRetrieve) { + super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId, + null, null, fieldsToRetrieve); + } + + /** + * Uses the {@link FlowActivityTable}. + */ + @Override + protected BaseTable<?> getTable() { + return FLOW_ACTIVITY_TABLE; + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) + throws IOException { + if (limit == null || limit < 0) { + limit = TimelineReader.DEFAULT_LIMIT; + } + if (createdTimeBegin == null) { + createdTimeBegin = DEFAULT_BEGIN_TIME; + } + if (createdTimeEnd == null) { + createdTimeEnd = DEFAULT_END_TIME; + } + } + + @Override + protected FilterList constructFilterListBasedOnFields() { + return null; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { + throw new UnsupportedOperationException( + "we don't support a single entity query"); + } + + @Override + protected ResultScanner getResults(Configuration hbaseConf, + Connection conn, FilterList filterList) throws IOException { + Scan scan = new Scan(); + if (createdTimeBegin == DEFAULT_BEGIN_TIME && + createdTimeEnd == DEFAULT_END_TIME) { + scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId)); + } else { + scan.setStartRow( + FlowActivityRowKey.getRowKeyPrefix(clusterId, createdTimeEnd)); + scan.setStopRow( + FlowActivityRowKey.getRowKeyPrefix(clusterId, + (createdTimeBegin <= 0 ? 0: (createdTimeBegin - 1)))); + } + // use the page filter to limit the result to the page size + // the scanner may still return more than the limit; therefore we need to + // read the right number as we iterate + scan.setFilter(new PageFilter(limit)); + return table.getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow()); + + long time = rowKey.getDayTimestamp(); + String user = rowKey.getUserId(); + String flowName = rowKey.getFlowName(); + + FlowActivityEntity flowActivity = + new FlowActivityEntity(clusterId, time, user, flowName); + // set the id + flowActivity.setId(flowActivity.getId()); + // get the list of run ids along with the version that are associated with + // this flow on this day + Map<String, Object> runIdsMap = + FlowActivityColumnPrefix.RUN_ID.readResults(result); + for (Map.Entry<String, Object> e : runIdsMap.entrySet()) { + Long runId = Long.valueOf(e.getKey()); + String version = (String)e.getValue(); + FlowRunEntity flowRun = new FlowRunEntity(); + flowRun.setUser(user); + flowRun.setName(flowName); + flowRun.setRunId(runId); + flowRun.setVersion(version); + // set the id + flowRun.setId(flowRun.getId()); + flowActivity.addFlowRun(flowRun); + } + + return flowActivity; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/61737325/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java new file mode 100644 index 0000000..6286ee1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for flow run entities that are stored in the flow run + * table. + */ +class FlowRunEntityReader extends TimelineEntityReader { + private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable(); + + public FlowRunEntityReader(String userId, String clusterId, + String flowName, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, + Map<String, Object> infoFilters, Map<String, String> configFilters, + Set<String> metricFilters, Set<String> eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, + EnumSet<Field> fieldsToRetrieve) { + super(userId, clusterId, flowName, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, null, metricsToRetrieve, fieldsToRetrieve, true); + } + + public FlowRunEntityReader(String userId, String clusterId, + String flowName, Long flowRunId, String appId, String entityType, + String entityId, TimelineFilterList confsToRetrieve, + TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) { + super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId, + null, metricsToRetrieve, fieldsToRetrieve); + } + + /** + * Uses the {@link FlowRunTable}. + */ + @Override + protected BaseTable<?> getTable() { + return FLOW_RUN_TABLE; + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + Preconditions.checkNotNull(userId, "userId shouldn't be null"); + Preconditions.checkNotNull(flowName, "flowName shouldn't be null"); + if (singleEntityRead) { + Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null"); + } + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) { + if (!singleEntityRead) { + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.noneOf(Field.class); + } + if (limit == null || limit < 0) { + limit = TimelineReader.DEFAULT_LIMIT; + } + if (createdTimeBegin == null) { + createdTimeBegin = DEFAULT_BEGIN_TIME; + } + if (createdTimeEnd == null) { + createdTimeEnd = DEFAULT_END_TIME; + } + if (!fieldsToRetrieve.contains(Field.METRICS) && + metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.METRICS); + } + } + } + + @Override + protected FilterList constructFilterListBasedOnFields() { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(FlowRunColumnFamily.INFO.getBytes())); + // Metrics not required. + if (!singleEntityRead && !fieldsToRetrieve.contains(Field.METRICS) && + !fieldsToRetrieve.contains(Field.ALL)) { + FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE); + infoColFamilyList.addFilter(infoColumnFamily); + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes("")))); + list.addFilter(infoColFamilyList); + } + if (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + FilterList infoColFamilyList = new FilterList(); + infoColFamilyList.addFilter(infoColumnFamily); + infoColFamilyList.addFilter(TimelineFilterUtils.createHBaseFilterList( + FlowRunColumnPrefix.METRIC, metricsToRetrieve)); + list.addFilter(infoColFamilyList); + } + return list; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { + byte[] rowKey = + FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } + return table.getResult(hbaseConf, conn, get); + } + + @Override + protected ResultScanner getResults(Configuration hbaseConf, + Connection conn, FilterList filterList) throws IOException { + Scan scan = new Scan(); + scan.setRowPrefixFilter( + FlowRunRowKey.getRowKeyPrefix(clusterId, userId, flowName)); + FilterList newList = new FilterList(); + newList.addFilter(new PageFilter(limit)); + if (filterList != null && !filterList.getFilters().isEmpty()) { + newList.addFilter(filterList); + } + scan.setFilter(newList); + return table.getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + FlowRunEntity flowRun = new FlowRunEntity(); + flowRun.setUser(userId); + flowRun.setName(flowName); + if (singleEntityRead) { + flowRun.setRunId(flowRunId); + } else { + FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow()); + flowRun.setRunId(rowKey.getFlowRunId()); + } + + // read the start time + Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result); + if (startTime != null) { + flowRun.setStartTime(startTime.longValue()); + } + if (!singleEntityRead && (flowRun.getStartTime() < createdTimeBegin || + flowRun.getStartTime() > createdTimeEnd)) { + return null; + } + + // read the end time if available + Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result); + if (endTime != null) { + flowRun.setMaxEndTime(endTime.longValue()); + } + + // read the flow version + String version = (String)FlowRunColumn.FLOW_VERSION.readResult(result); + if (version != null) { + flowRun.setVersion(version); + } + + // read metrics + if (singleEntityRead || fieldsToRetrieve.contains(Field.METRICS)) { + readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC); + } + + // set the id + flowRun.setId(flowRun.getId()); + return flowRun; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/61737325/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java new file mode 100644 index 0000000..f3f380c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -0,0 +1,497 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for generic entities that are stored in the entity + * table. + */ +class GenericEntityReader extends TimelineEntityReader { + private static final EntityTable ENTITY_TABLE = new EntityTable(); + private static final Log LOG = LogFactory.getLog(GenericEntityReader.class); + + /** + * Used to look up the flow context. + */ + private final AppToFlowTable appToFlowTable = new AppToFlowTable(); + + public GenericEntityReader(String userId, String clusterId, + String flowName, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, + Map<String, Object> infoFilters, Map<String, String> configFilters, + Set<String> metricFilters, Set<String> eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, + EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) { + super(userId, clusterId, flowName, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve, + sortedKeys); + } + + public GenericEntityReader(String userId, String clusterId, + String flowName, Long flowRunId, String appId, String entityType, + String entityId, TimelineFilterList confsToRetrieve, + TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) { + super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId, + confsToRetrieve, metricsToRetrieve, fieldsToRetrieve); + } + + /** + * Uses the {@link EntityTable}. + */ + protected BaseTable<?> getTable() { + return ENTITY_TABLE; + } + + @Override + protected FilterList constructFilterListBasedOnFields() { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + // Fetch all the columns. + if (fieldsToRetrieve.contains(Field.ALL) && + (confsToRetrieve == null || + confsToRetrieve.getFilterList().isEmpty()) && + (metricsToRetrieve == null || + metricsToRetrieve.getFilterList().isEmpty())) { + return list; + } + FilterList infoColFamilyList = new FilterList(); + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(EntityColumnFamily.INFO.getBytes())); + infoColFamilyList.addFilter(infoColumnFamily); + // Events not required. + if (!fieldsToRetrieve.contains(Field.EVENTS) && + !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + EntityColumnPrefix.EVENT.getColumnPrefixBytes("")))); + } + // info not required. + if (!fieldsToRetrieve.contains(Field.INFO) && + !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + EntityColumnPrefix.INFO.getColumnPrefixBytes("")))); + } + // is related to not required. + if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) && + !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + EntityColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes("")))); + } + // relates to not required. + if (!fieldsToRetrieve.contains(Field.RELATES_TO) && + !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + EntityColumnPrefix.RELATES_TO.getColumnPrefixBytes("")))); + } + list.addFilter(infoColFamilyList); + if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) || + (confsToRetrieve != null && + !confsToRetrieve.getFilterList().isEmpty())) { + FilterList filterCfg = + new FilterList(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(EntityColumnFamily.CONFIGS.getBytes()))); + if (confsToRetrieve != null && + !confsToRetrieve.getFilterList().isEmpty()) { + filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.CONFIG, confsToRetrieve)); + } + list.addFilter(filterCfg); + } + if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) || + (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty())) { + FilterList filterMetrics = + new FilterList(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(EntityColumnFamily.METRICS.getBytes()))); + if (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.METRIC, metricsToRetrieve)); + } + list.addFilter(filterMetrics); + } + return list; + } + + protected FlowContext lookupFlowContext(String clusterId, String appId, + Configuration hbaseConf, Connection conn) throws IOException { + byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); + Get get = new Get(rowKey); + Result result = appToFlowTable.getResult(hbaseConf, conn, get); + if (result != null && !result.isEmpty()) { + return new FlowContext( + AppToFlowColumn.USER_ID.readResult(result).toString(), + AppToFlowColumn.FLOW_ID.readResult(result).toString(), + ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue()); + } else { + throw new IOException( + "Unable to find the context flow ID and flow run ID for clusterId=" + + clusterId + ", appId=" + appId); + } + } + + protected static class FlowContext { + protected final String userId; + protected final String flowName; + protected final Long flowRunId; + public FlowContext(String user, String flowName, Long flowRunId) { + this.userId = user; + this.flowName = flowName; + this.flowRunId = flowRunId; + } + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + Preconditions.checkNotNull(appId, "appId shouldn't be null"); + Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); + if (singleEntityRead) { + Preconditions.checkNotNull(entityId, "entityId shouldn't be null"); + } + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) + throws IOException { + // In reality all three should be null or neither should be null + if (flowName == null || flowRunId == null || userId == null) { + FlowContext context = + lookupFlowContext(clusterId, appId, hbaseConf, conn); + flowName = context.flowName; + flowRunId = context.flowRunId; + userId = context.userId; + } + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.noneOf(Field.class); + } + if (!fieldsToRetrieve.contains(Field.CONFIGS) && + confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.CONFIGS); + } + if (!fieldsToRetrieve.contains(Field.METRICS) && + metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.METRICS); + } + if (!singleEntityRead) { + if (limit == null || limit < 0) { + limit = TimelineReader.DEFAULT_LIMIT; + } + if (createdTimeBegin == null) { + createdTimeBegin = DEFAULT_BEGIN_TIME; + } + if (createdTimeEnd == null) { + createdTimeEnd = DEFAULT_END_TIME; + } + if (modifiedTimeBegin == null) { + modifiedTimeBegin = DEFAULT_BEGIN_TIME; + } + if (modifiedTimeEnd == null) { + modifiedTimeEnd = DEFAULT_END_TIME; + } + } + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { + byte[] rowKey = + EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId, + entityType, entityId); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } + return table.getResult(hbaseConf, conn, get); + } + + @Override + protected ResultScanner getResults(Configuration hbaseConf, + Connection conn, FilterList filterList) throws IOException { + // Scan through part of the table to find the entities belong to one app + // and one type + Scan scan = new Scan(); + scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix( + clusterId, userId, flowName, flowRunId, appId, entityType)); + scan.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + scan.setFilter(filterList); + } + return table.getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + if (result == null || result.isEmpty()) { + return null; + } + TimelineEntity entity = new TimelineEntity(); + String entityType = EntityColumn.TYPE.readResult(result).toString(); + entity.setType(entityType); + String entityId = EntityColumn.ID.readResult(result).toString(); + entity.setId(entityId); + + // fetch created time + Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result); + entity.setCreatedTime(createdTime.longValue()); + if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin || + entity.getCreatedTime() > createdTimeEnd)) { + return null; + } + + // fetch modified time + Number modifiedTime = (Number)EntityColumn.MODIFIED_TIME.readResult(result); + entity.setModifiedTime(modifiedTime.longValue()); + if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin || + entity.getModifiedTime() > modifiedTimeEnd)) { + return null; + } + + // fetch is related to entities + boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { + readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true); + if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations( + entity.getIsRelatedToEntities(), isRelatedTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { + entity.getIsRelatedToEntities().clear(); + } + } + + // fetch relates to entities + boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { + readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false); + if (checkRelatesTo && !TimelineStorageUtils.matchRelations( + entity.getRelatesToEntities(), relatesTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.RELATES_TO)) { + entity.getRelatesToEntities().clear(); + } + } + + // fetch info + boolean checkInfo = infoFilters != null && infoFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.INFO) || checkInfo) { + readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false); + if (checkInfo && + !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.INFO)) { + entity.getInfo().clear(); + } + } + + // fetch configs + boolean checkConfigs = configFilters != null && configFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { + readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true); + if (checkConfigs && !TimelineStorageUtils.matchFilters( + entity.getConfigs(), configFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.CONFIGS)) { + entity.getConfigs().clear(); + } + } + + // fetch events + boolean checkEvents = eventFilters != null && eventFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { + readEvents(entity, result, false); + if (checkEvents && !TimelineStorageUtils.matchEventFilters( + entity.getEvents(), eventFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.EVENTS)) { + entity.getEvents().clear(); + } + } + + // fetch metrics + boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { + readMetrics(entity, result, EntityColumnPrefix.METRIC); + if (checkMetrics && !TimelineStorageUtils.matchMetricFilters( + entity.getMetrics(), metricFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.METRICS)) { + entity.getMetrics().clear(); + } + } + return entity; + } + + /** + * Helper method for reading relationship. + */ + protected <T> void readRelationship( + TimelineEntity entity, Result result, ColumnPrefix<T> prefix, + boolean isRelatedTo) throws IOException { + // isRelatedTo and relatesTo are of type Map<String, Set<String>> + Map<String, Object> columns = prefix.readResults(result); + for (Map.Entry<String, Object> column : columns.entrySet()) { + for (String id : Separator.VALUES.splitEncoded( + column.getValue().toString())) { + if (isRelatedTo) { + entity.addIsRelatedToEntity(column.getKey(), id); + } else { + entity.addRelatesToEntity(column.getKey(), id); + } + } + } + } + + /** + * Helper method for reading key-value pairs for either info or config. + */ + protected <T> void readKeyValuePairs( + TimelineEntity entity, Result result, ColumnPrefix<T> prefix, + boolean isConfig) throws IOException { + // info and configuration are of type Map<String, Object or String> + Map<String, Object> columns = prefix.readResults(result); + if (isConfig) { + for (Map.Entry<String, Object> column : columns.entrySet()) { + entity.addConfig(column.getKey(), column.getValue().toString()); + } + } else { + entity.addInfo(columns); + } + } + + /** + * Read events from the entity table or the application table. The column name + * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted + * if there is no info associated with the event. + * + * See {@link EntityTable} and {@link ApplicationTable} for a more detailed + * schema description. + */ + protected void readEvents(TimelineEntity entity, Result result, + boolean isApplication) throws IOException { + Map<String, TimelineEvent> eventsMap = new HashMap<>(); + Map<?, Object> eventsResult = isApplication ? + ApplicationColumnPrefix.EVENT. + readResultsHavingCompoundColumnQualifiers(result) : + EntityColumnPrefix.EVENT. + readResultsHavingCompoundColumnQualifiers(result); + for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) { + byte[][] karr = (byte[][])eventResult.getKey(); + // the column name is of the form "eventId=timestamp=infoKey" + if (karr.length == 3) { + String id = Bytes.toString(karr[0]); + long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1])); + String key = Separator.VALUES.joinEncoded(id, Long.toString(ts)); + TimelineEvent event = eventsMap.get(key); + if (event == null) { + event = new TimelineEvent(); + event.setId(id); + event.setTimestamp(ts); + eventsMap.put(key, event); + } + // handle empty info + String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]); + if (infoKey != null) { + event.addInfo(infoKey, eventResult.getValue()); + } + } else { + LOG.warn("incorrectly formatted column name: it will be discarded"); + continue; + } + } + Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values()); + entity.addEvents(eventsSet); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/61737325/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java new file mode 100644 index 0000000..e801466 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; + +/** + * The base class for reading and deserializing timeline entities from the + * HBase storage. Different types can be defined for different types of the + * entities that are being requested. + */ +public abstract class TimelineEntityReader { + private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class); + protected static final long DEFAULT_BEGIN_TIME = 0L; + protected static final long DEFAULT_END_TIME = Long.MAX_VALUE; + + protected final boolean singleEntityRead; + + protected String userId; + protected String clusterId; + protected String flowName; + protected Long flowRunId; + protected String appId; + protected String entityType; + protected EnumSet<Field> fieldsToRetrieve; + // used only for a single entity read mode + protected String entityId; + // used only for multiple entity read mode + protected Long limit; + protected Long createdTimeBegin; + protected Long createdTimeEnd; + protected Long modifiedTimeBegin; + protected Long modifiedTimeEnd; + protected Map<String, Set<String>> relatesTo; + protected Map<String, Set<String>> isRelatedTo; + protected Map<String, Object> infoFilters; + protected Map<String, String> configFilters; + protected Set<String> metricFilters; + protected Set<String> eventFilters; + protected TimelineFilterList confsToRetrieve; + protected TimelineFilterList metricsToRetrieve; + + /** + * Main table the entity reader uses. + */ + protected BaseTable<?> table; + + /** + * Specifies whether keys for this table are sorted in a manner where entities + * can be retrieved by created time. If true, it will be sufficient to collect + * the first results as specified by the limit. Otherwise all matched entities + * will be fetched and then limit applied. + */ + private boolean sortedKeys = false; + + /** + * Instantiates a reader for multiple-entity reads. + */ + protected TimelineEntityReader(String userId, String clusterId, + String flowName, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, + Map<String, Object> infoFilters, Map<String, String> configFilters, + Set<String> metricFilters, Set<String> eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, + EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) { + this.singleEntityRead = false; + this.sortedKeys = sortedKeys; + this.userId = userId; + this.clusterId = clusterId; + this.flowName = flowName; + this.flowRunId = flowRunId; + this.appId = appId; + this.entityType = entityType; + this.fieldsToRetrieve = fieldsToRetrieve; + this.limit = limit; + this.createdTimeBegin = createdTimeBegin; + this.createdTimeEnd = createdTimeEnd; + this.modifiedTimeBegin = modifiedTimeBegin; + this.modifiedTimeEnd = modifiedTimeEnd; + this.relatesTo = relatesTo; + this.isRelatedTo = isRelatedTo; + this.infoFilters = infoFilters; + this.configFilters = configFilters; + this.metricFilters = metricFilters; + this.eventFilters = eventFilters; + this.confsToRetrieve = confsToRetrieve; + this.metricsToRetrieve = metricsToRetrieve; + + this.table = getTable(); + } + + /** + * Instantiates a reader for single-entity reads. + */ + protected TimelineEntityReader(String userId, String clusterId, + String flowName, Long flowRunId, String appId, String entityType, + String entityId, TimelineFilterList confsToRetrieve, + TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) { + this.singleEntityRead = true; + this.userId = userId; + this.clusterId = clusterId; + this.flowName = flowName; + this.flowRunId = flowRunId; + this.appId = appId; + this.entityType = entityType; + this.fieldsToRetrieve = fieldsToRetrieve; + this.entityId = entityId; + this.confsToRetrieve = confsToRetrieve; + this.metricsToRetrieve = metricsToRetrieve; + + this.table = getTable(); + } + + /** + * Creates a {@link FilterList} based on fields, confs and metrics to + * retrieve. This filter list will be set in Scan/Get objects to trim down + * results fetched from HBase back-end storage. + * @return a {@link FilterList} object. + */ + protected abstract FilterList constructFilterListBasedOnFields(); + + /** + * Reads and deserializes a single timeline entity from the HBase storage. + */ + public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) + throws IOException { + validateParams(); + augmentParams(hbaseConf, conn); + + FilterList filterList = constructFilterListBasedOnFields(); + Result result = getResult(hbaseConf, conn, filterList); + if (result == null || result.isEmpty()) { + // Could not find a matching row. + LOG.info("Cannot find matching entity of type " + entityType); + return null; + } + return parseEntity(result); + } + + /** + * Reads and deserializes a set of timeline entities from the HBase storage. + * It goes through all the results available, and returns the number of + * entries as specified in the limit in the entity's natural sort order. + */ + public Set<TimelineEntity> readEntities(Configuration hbaseConf, + Connection conn) throws IOException { + validateParams(); + augmentParams(hbaseConf, conn); + + NavigableSet<TimelineEntity> entities = new TreeSet<>(); + FilterList filterList = constructFilterListBasedOnFields(); + ResultScanner results = getResults(hbaseConf, conn, filterList); + try { + for (Result result : results) { + TimelineEntity entity = parseEntity(result); + if (entity == null) { + continue; + } + entities.add(entity); + if (!sortedKeys) { + if (entities.size() > limit) { + entities.pollLast(); + } + } else { + if (entities.size() == limit) { + break; + } + } + } + return entities; + } finally { + results.close(); + } + } + + /** + * Returns the main table to be used by the entity reader. + */ + protected abstract BaseTable<?> getTable(); + + /** + * Validates the required parameters to read the entities. + */ + protected abstract void validateParams(); + + /** + * Sets certain parameters to defaults if the values are not provided. + */ + protected abstract void augmentParams(Configuration hbaseConf, + Connection conn) throws IOException; + + /** + * Fetches a {@link Result} instance for a single-entity read. + * + * @return the {@link Result} instance or null if no such record is found. + */ + protected abstract Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException; + + /** + * Fetches a {@link ResultScanner} for a multi-entity read. + */ + protected abstract ResultScanner getResults(Configuration hbaseConf, + Connection conn, FilterList filterList) throws IOException; + + /** + * Given a {@link Result} instance, deserializes and creates a + * {@link TimelineEntity}. + * + * @return the {@link TimelineEntity} instance, or null if the {@link Result} + * is null or empty. + */ + protected abstract TimelineEntity parseEntity(Result result) + throws IOException; + + /** + * Helper method for reading and deserializing {@link TimelineMetric} objects + * using the specified column prefix. The timeline metrics then are added to + * the given timeline entity. + */ + protected void readMetrics(TimelineEntity entity, Result result, + ColumnPrefix<?> columnPrefix) throws IOException { + NavigableMap<String, NavigableMap<Long, Number>> metricsResult = + columnPrefix.readResultsWithTimestamps(result); + for (Map.Entry<String, NavigableMap<Long, Number>> metricResult: + metricsResult.entrySet()) { + TimelineMetric metric = new TimelineMetric(); + metric.setId(metricResult.getKey()); + // Simply assume that if the value set contains more than 1 elements, the + // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric + metric.setType(metricResult.getValue().size() > 1 ? + TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE); + metric.addValues(metricResult.getValue()); + entity.addMetric(metric); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/61737325/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java new file mode 100644 index 0000000..c77897a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; + +/** + * Factory methods for instantiating a timeline entity reader. + */ +public class TimelineEntityReaderFactory { + /** + * Creates a timeline entity reader instance for reading a single entity with + * the specified input. + */ + public static TimelineEntityReader createSingleEntityReader(String userId, + String clusterId, String flowName, Long flowRunId, String appId, + String entityType, String entityId, TimelineFilterList confs, + TimelineFilterList metrics, EnumSet<Field> fieldsToRetrieve) { + // currently the types that are handled separate from the generic entity + // table are application, flow run, and flow activity entities + if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { + return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId, + appId, entityType, entityId, confs, metrics, fieldsToRetrieve); + } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { + return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId, + appId, entityType, entityId, confs, metrics, fieldsToRetrieve); + } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { + return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId, + appId, entityType, entityId, fieldsToRetrieve); + } else { + // assume we're dealing with a generic entity read + return new GenericEntityReader(userId, clusterId, flowName, flowRunId, + appId, entityType, entityId, confs, metrics, fieldsToRetrieve); + } + } + + /** + * Creates a timeline entity reader instance for reading set of entities with + * the specified input and predicates. + */ + public static TimelineEntityReader createMultipleEntitiesReader(String userId, + String clusterId, String flowName, Long flowRunId, String appId, + String entityType, Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo, + Map<String, Object> infoFilters, Map<String, String> configFilters, + Set<String> metricFilters, Set<String> eventFilters, + TimelineFilterList confs, TimelineFilterList metrics, + EnumSet<Field> fieldsToRetrieve) { + // currently the types that are handled separate from the generic entity + // table are application, flow run, and flow activity entities + if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { + return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId, + appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, + infoFilters, configFilters, metricFilters, eventFilters, confs, + metrics, fieldsToRetrieve); + } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { + return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId, + appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, + infoFilters, configFilters, metricFilters, eventFilters, + fieldsToRetrieve); + } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { + return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId, + appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, + infoFilters, configFilters, metricFilters, eventFilters, confs, + metrics, fieldsToRetrieve); + } else { + // assume we're dealing with a generic entity read + return new GenericEntityReader(userId, clusterId, flowName, flowRunId, + appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, + infoFilters, configFilters, metricFilters, eventFilters, confs, + metrics, fieldsToRetrieve, false); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/61737325/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java new file mode 100644 index 0000000..0b3fa38 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org