http://git-wip-us.apache.org/repos/asf/hadoop/blob/b01514f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java deleted file mode 100644 index 9ba5e38..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage.reader; - -import java.io.IOException; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.PageFilter; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; - -import com.google.common.base.Preconditions; - -/** - * Timeline entity reader for flow activity entities that are stored in the - * flow activity table. - */ -class FlowActivityEntityReader extends TimelineEntityReader { - private static final FlowActivityTable FLOW_ACTIVITY_TABLE = - new FlowActivityTable(); - - /** - * Used to convert Long key components to and from storage format. - */ - private final KeyConverter<Long> longKeyConverter = new LongKeyConverter(); - - - public FlowActivityEntityReader(TimelineReaderContext ctxt, - TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { - super(ctxt, entityFilters, toRetrieve, true); - } - - public FlowActivityEntityReader(TimelineReaderContext ctxt, - TimelineDataToRetrieve toRetrieve) { - super(ctxt, toRetrieve); - } - - /** - * Uses the {@link FlowActivityTable}. - */ - @Override - protected BaseTable<?> getTable() { - return FLOW_ACTIVITY_TABLE; - } - - @Override - protected void validateParams() { - Preconditions.checkNotNull(getContext().getClusterId(), - "clusterId shouldn't be null"); - } - - @Override - protected void augmentParams(Configuration hbaseConf, Connection conn) - throws IOException { - createFiltersIfNull(); - } - - @Override - protected FilterList constructFilterListBasedOnFilters() throws IOException { - return null; - } - - @Override - protected FilterList constructFilterListBasedOnFields() { - return null; - } - - @Override - protected Result getResult(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException { - throw new UnsupportedOperationException( - "we don't support a single entity query"); - } - - @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException { - Scan scan = new Scan(); - String clusterId = getContext().getClusterId(); - if (getFilters().getCreatedTimeBegin() == 0L && - getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) { - // All records have to be chosen. - scan.setRowPrefixFilter(new FlowActivityRowKeyPrefix(clusterId) - .getRowKeyPrefix()); - } else { - scan.setStartRow(new FlowActivityRowKeyPrefix(clusterId, getFilters() - .getCreatedTimeEnd()).getRowKeyPrefix()); - scan.setStopRow(new FlowActivityRowKeyPrefix(clusterId, (getFilters() - .getCreatedTimeBegin() <= 0 ? 0 - : (getFilters().getCreatedTimeBegin() - 1))).getRowKeyPrefix()); - } - // use the page filter to limit the result to the page size - // the scanner may still return more than the limit; therefore we need to - // read the right number as we iterate - scan.setFilter(new PageFilter(getFilters().getLimit())); - return getTable().getResultScanner(hbaseConf, conn, scan); - } - - @Override - protected TimelineEntity parseEntity(Result result) throws IOException { - FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow()); - - Long time = rowKey.getDayTimestamp(); - String user = rowKey.getUserId(); - String flowName = rowKey.getFlowName(); - - FlowActivityEntity flowActivity = new FlowActivityEntity( - getContext().getClusterId(), time, user, flowName); - // set the id - flowActivity.setId(flowActivity.getId()); - // get the list of run ids along with the version that are associated with - // this flow on this day - Map<Long, Object> runIdsMap = - FlowActivityColumnPrefix.RUN_ID.readResults(result, longKeyConverter); - for (Map.Entry<Long, Object> e : runIdsMap.entrySet()) { - Long runId = e.getKey(); - String version = (String)e.getValue(); - FlowRunEntity flowRun = new FlowRunEntity(); - flowRun.setUser(user); - flowRun.setName(flowName); - flowRun.setRunId(runId); - flowRun.setVersion(version); - // set the id - flowRun.setId(flowRun.getId()); - flowActivity.addFlowRun(flowRun); - } - - return flowActivity; - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b01514f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java deleted file mode 100644 index 986a28f..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java +++ /dev/null @@ -1,269 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.reader; - -import java.io.IOException; -import java.util.EnumSet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.BinaryComparator; -import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.FamilyFilter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.FilterList.Operator; -import org.apache.hadoop.hbase.filter.PageFilter; -import org.apache.hadoop.hbase.filter.QualifierFilter; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKeyPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; -import org.apache.hadoop.yarn.webapp.BadRequestException; - -import com.google.common.base.Preconditions; - -/** - * Timeline entity reader for flow run entities that are stored in the flow run - * table. - */ -class FlowRunEntityReader extends TimelineEntityReader { - private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable(); - - public FlowRunEntityReader(TimelineReaderContext ctxt, - TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { - super(ctxt, entityFilters, toRetrieve, true); - } - - public FlowRunEntityReader(TimelineReaderContext ctxt, - TimelineDataToRetrieve toRetrieve) { - super(ctxt, toRetrieve); - } - - /** - * Uses the {@link FlowRunTable}. - */ - @Override - protected BaseTable<?> getTable() { - return FLOW_RUN_TABLE; - } - - @Override - protected void validateParams() { - Preconditions.checkNotNull(getContext(), "context shouldn't be null"); - Preconditions.checkNotNull(getDataToRetrieve(), - "data to retrieve shouldn't be null"); - Preconditions.checkNotNull(getContext().getClusterId(), - "clusterId shouldn't be null"); - Preconditions.checkNotNull(getContext().getUserId(), - "userId shouldn't be null"); - Preconditions.checkNotNull(getContext().getFlowName(), - "flowName shouldn't be null"); - if (isSingleEntityRead()) { - Preconditions.checkNotNull(getContext().getFlowRunId(), - "flowRunId shouldn't be null"); - } - EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); - if (!isSingleEntityRead() && fieldsToRetrieve != null) { - for (Field field : fieldsToRetrieve) { - if (field != Field.ALL && field != Field.METRICS) { - throw new BadRequestException("Invalid field " + field - + " specified while querying flow runs."); - } - } - } - } - - @Override - protected void augmentParams(Configuration hbaseConf, Connection conn) { - // Add metrics to fields to retrieve if metricsToRetrieve is specified. - getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve(); - if (!isSingleEntityRead()) { - createFiltersIfNull(); - } - } - - protected FilterList constructFilterListBasedOnFilters() throws IOException { - FilterList listBasedOnFilters = new FilterList(); - // Filter based on created time range. - Long createdTimeBegin = getFilters().getCreatedTimeBegin(); - Long createdTimeEnd = getFilters().getCreatedTimeEnd(); - if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) { - listBasedOnFilters.addFilter(TimelineFilterUtils - .createSingleColValueFiltersByRange(FlowRunColumn.MIN_START_TIME, - createdTimeBegin, createdTimeEnd)); - } - // Filter based on metric filters. - TimelineFilterList metricFilters = getFilters().getMetricFilters(); - if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( - FlowRunColumnPrefix.METRIC, metricFilters)); - } - return listBasedOnFilters; - } - - /** - * Add {@link QualifierFilter} filters to filter list for each column of flow - * run table. - * - * @return filter list to which qualifier filters have been added. - */ - private FilterList updateFixedColumns() { - FilterList columnsList = new FilterList(Operator.MUST_PASS_ONE); - for (FlowRunColumn column : FlowRunColumn.values()) { - columnsList.addFilter(new QualifierFilter(CompareOp.EQUAL, - new BinaryComparator(column.getColumnQualifierBytes()))); - } - return columnsList; - } - - @Override - protected FilterList constructFilterListBasedOnFields() throws IOException { - FilterList list = new FilterList(Operator.MUST_PASS_ONE); - // By default fetch everything in INFO column family. - FamilyFilter infoColumnFamily = - new FamilyFilter(CompareOp.EQUAL, new BinaryComparator( - FlowRunColumnFamily.INFO.getBytes())); - TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); - // If multiple entities have to be retrieved, check if metrics have to be - // retrieved and if not, add a filter so that metrics can be excluded. - // Metrics are always returned if we are reading a single entity. - if (!isSingleEntityRead() - && !hasField(dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) { - FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE); - infoColFamilyList.addFilter(infoColumnFamily); - infoColFamilyList.addFilter(new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator(FlowRunColumnPrefix.METRIC - .getColumnPrefixBytes("")))); - list.addFilter(infoColFamilyList); - } else { - // Check if metricsToRetrieve are specified and if they are, create a - // filter list for info column family by adding flow run tables columns - // and a list for metrics to retrieve. Pls note that fieldsToRetrieve - // will have METRICS added to it if metricsToRetrieve are specified - // (in augmentParams()). - TimelineFilterList metricsToRetrieve = - dataToRetrieve.getMetricsToRetrieve(); - if (metricsToRetrieve != null - && !metricsToRetrieve.getFilterList().isEmpty()) { - FilterList infoColFamilyList = new FilterList(); - infoColFamilyList.addFilter(infoColumnFamily); - FilterList columnsList = updateFixedColumns(); - columnsList.addFilter(TimelineFilterUtils.createHBaseFilterList( - FlowRunColumnPrefix.METRIC, metricsToRetrieve)); - infoColFamilyList.addFilter(columnsList); - list.addFilter(infoColFamilyList); - } - } - return list; - } - - @Override - protected Result getResult(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException { - TimelineReaderContext context = getContext(); - FlowRunRowKey flowRunRowKey = - new FlowRunRowKey(context.getClusterId(), context.getUserId(), - context.getFlowName(), context.getFlowRunId()); - byte[] rowKey = flowRunRowKey.getRowKey(); - Get get = new Get(rowKey); - get.setMaxVersions(Integer.MAX_VALUE); - if (filterList != null && !filterList.getFilters().isEmpty()) { - get.setFilter(filterList); - } - return getTable().getResult(hbaseConf, conn, get); - } - - @Override - protected ResultScanner getResults(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException { - Scan scan = new Scan(); - TimelineReaderContext context = getContext(); - RowKeyPrefix<FlowRunRowKey> flowRunRowKeyPrefix = - new FlowRunRowKeyPrefix(context.getClusterId(), context.getUserId(), - context.getFlowName()); - scan.setRowPrefixFilter(flowRunRowKeyPrefix.getRowKeyPrefix()); - FilterList newList = new FilterList(); - newList.addFilter(new PageFilter(getFilters().getLimit())); - if (filterList != null && !filterList.getFilters().isEmpty()) { - newList.addFilter(filterList); - } - scan.setFilter(newList); - scan.setMaxVersions(Integer.MAX_VALUE); - return getTable().getResultScanner(hbaseConf, conn, scan); - } - - @Override - protected TimelineEntity parseEntity(Result result) throws IOException { - TimelineReaderContext context = getContext(); - FlowRunEntity flowRun = new FlowRunEntity(); - flowRun.setUser(context.getUserId()); - flowRun.setName(context.getFlowName()); - if (isSingleEntityRead()) { - flowRun.setRunId(context.getFlowRunId()); - } else { - FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow()); - flowRun.setRunId(rowKey.getFlowRunId()); - } - - // read the start time - Long startTime = (Long) FlowRunColumn.MIN_START_TIME.readResult(result); - if (startTime != null) { - flowRun.setStartTime(startTime.longValue()); - } - - // read the end time if available - Long endTime = (Long) FlowRunColumn.MAX_END_TIME.readResult(result); - if (endTime != null) { - flowRun.setMaxEndTime(endTime.longValue()); - } - - // read the flow version - String version = (String) FlowRunColumn.FLOW_VERSION.readResult(result); - if (version != null) { - flowRun.setVersion(version); - } - - // read metrics if its a single entity query or if METRICS are part of - // fieldsToRetrieve. - if (isSingleEntityRead() - || hasField(getDataToRetrieve().getFieldsToRetrieve(), Field.METRICS)) { - readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC); - } - - // set the id - flowRun.setId(flowRun.getId()); - return flowRun; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b01514f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java deleted file mode 100644 index 4e1ab8a..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java +++ /dev/null @@ -1,648 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.reader; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.BinaryComparator; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.FamilyFilter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.FilterList.Operator; -import org.apache.hadoop.hbase.filter.QualifierFilter; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; -import org.apache.hadoop.yarn.webapp.NotFoundException; - -import com.google.common.base.Preconditions; - -/** - * Timeline entity reader for generic entities that are stored in the entity - * table. - */ -class GenericEntityReader extends TimelineEntityReader { - private static final EntityTable ENTITY_TABLE = new EntityTable(); - - /** - * Used to look up the flow context. - */ - private final AppToFlowTable appToFlowTable = new AppToFlowTable(); - - /** - * Used to convert strings key components to and from storage format. - */ - private final KeyConverter<String> stringKeyConverter = - new StringKeyConverter(); - - public GenericEntityReader(TimelineReaderContext ctxt, - TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve, - boolean sortedKeys) { - super(ctxt, entityFilters, toRetrieve, sortedKeys); - } - - public GenericEntityReader(TimelineReaderContext ctxt, - TimelineDataToRetrieve toRetrieve) { - super(ctxt, toRetrieve); - } - - /** - * Uses the {@link EntityTable}. - */ - protected BaseTable<?> getTable() { - return ENTITY_TABLE; - } - - @Override - protected FilterList constructFilterListBasedOnFilters() throws IOException { - // Filters here cannot be null for multiple entity reads as they are set in - // augmentParams if null. - FilterList listBasedOnFilters = new FilterList(); - TimelineEntityFilters filters = getFilters(); - // Create filter list based on created time range and add it to - // listBasedOnFilters. - long createdTimeBegin = filters.getCreatedTimeBegin(); - long createdTimeEnd = filters.getCreatedTimeEnd(); - if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) { - listBasedOnFilters.addFilter(TimelineFilterUtils - .createSingleColValueFiltersByRange(EntityColumn.CREATED_TIME, - createdTimeBegin, createdTimeEnd)); - } - // Create filter list based on metric filters and add it to - // listBasedOnFilters. - TimelineFilterList metricFilters = filters.getMetricFilters(); - if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.METRIC, metricFilters)); - } - // Create filter list based on config filters and add it to - // listBasedOnFilters. - TimelineFilterList configFilters = filters.getConfigFilters(); - if (configFilters != null && !configFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.CONFIG, configFilters)); - } - // Create filter list based on info filters and add it to listBasedOnFilters - TimelineFilterList infoFilters = filters.getInfoFilters(); - if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.INFO, infoFilters)); - } - return listBasedOnFilters; - } - - /** - * Check if we need to fetch only some of the event columns. - * - * @return true if we need to fetch some of the columns, false otherwise. - */ - private boolean fetchPartialEventCols(TimelineFilterList eventFilters, - EnumSet<Field> fieldsToRetrieve) { - return (eventFilters != null && !eventFilters.getFilterList().isEmpty() && - !hasField(fieldsToRetrieve, Field.EVENTS)); - } - - /** - * Check if we need to fetch only some of the relates_to columns. - * - * @return true if we need to fetch some of the columns, false otherwise. - */ - private boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo, - EnumSet<Field> fieldsToRetrieve) { - return (relatesTo != null && !relatesTo.getFilterList().isEmpty() && - !hasField(fieldsToRetrieve, Field.RELATES_TO)); - } - - /** - * Check if we need to fetch only some of the is_related_to columns. - * - * @return true if we need to fetch some of the columns, false otherwise. - */ - private boolean fetchPartialIsRelatedToCols(TimelineFilterList isRelatedTo, - EnumSet<Field> fieldsToRetrieve) { - return (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty() && - !hasField(fieldsToRetrieve, Field.IS_RELATED_TO)); - } - - /** - * Check if we need to fetch only some of the columns based on event filters, - * relatesto and isrelatedto from info family. - * - * @return true, if we need to fetch only some of the columns, false if we - * need to fetch all the columns under info column family. - */ - protected boolean fetchPartialColsFromInfoFamily() { - EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); - TimelineEntityFilters filters = getFilters(); - return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve) - || fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve) - || fetchPartialIsRelatedToCols(filters.getIsRelatedTo(), - fieldsToRetrieve); - } - - /** - * Check if we need to create filter list based on fields. We need to create a - * filter list iff all fields need not be retrieved or we have some specific - * fields or metrics to retrieve. We also need to create a filter list if we - * have relationships(relatesTo/isRelatedTo) and event filters specified for - * the query. - * - * @return true if we need to create the filter list, false otherwise. - */ - protected boolean needCreateFilterListBasedOnFields() { - TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); - // Check if all fields are to be retrieved or not. If all fields have to - // be retrieved, also check if we have some metrics or configs to - // retrieve specified for the query because then a filter list will have - // to be created. - boolean flag = - !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) - || (dataToRetrieve.getConfsToRetrieve() != null && !dataToRetrieve - .getConfsToRetrieve().getFilterList().isEmpty()) - || (dataToRetrieve.getMetricsToRetrieve() != null && !dataToRetrieve - .getMetricsToRetrieve().getFilterList().isEmpty()); - // Filters need to be checked only if we are reading multiple entities. If - // condition above is false, we check if there are relationships(relatesTo/ - // isRelatedTo) and event filters specified for the query. - if (!flag && !isSingleEntityRead()) { - TimelineEntityFilters filters = getFilters(); - flag = - (filters.getEventFilters() != null && !filters.getEventFilters() - .getFilterList().isEmpty()) - || (filters.getIsRelatedTo() != null && !filters.getIsRelatedTo() - .getFilterList().isEmpty()) - || (filters.getRelatesTo() != null && !filters.getRelatesTo() - .getFilterList().isEmpty()); - } - return flag; - } - - /** - * Add {@link QualifierFilter} filters to filter list for each column of - * entity table. - * - * @param list filter list to which qualifier filters have to be added. - */ - protected void updateFixedColumns(FilterList list) { - for (EntityColumn column : EntityColumn.values()) { - list.addFilter(new QualifierFilter(CompareOp.EQUAL, new BinaryComparator( - column.getColumnQualifierBytes()))); - } - } - - /** - * Creates a filter list which indicates that only some of the column - * qualifiers in the info column family will be returned in result. - * - * @param isApplication If true, it means operations are to be performed for - * application table, otherwise for entity table. - * @return filter list. - * @throws IOException if any problem occurs while creating filter list. - */ - private FilterList createFilterListForColsOfInfoFamily() throws IOException { - FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE); - // Add filters for each column in entity table. - updateFixedColumns(infoFamilyColsFilter); - EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); - // If INFO field has to be retrieved, add a filter for fetching columns - // with INFO column prefix. - if (hasField(fieldsToRetrieve, Field.INFO)) { - infoFamilyColsFilter - .addFilter(TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.EQUAL, EntityColumnPrefix.INFO)); - } - TimelineFilterList relatesTo = getFilters().getRelatesTo(); - if (hasField(fieldsToRetrieve, Field.RELATES_TO)) { - // If RELATES_TO field has to be retrieved, add a filter for fetching - // columns with RELATES_TO column prefix. - infoFamilyColsFilter.addFilter(TimelineFilterUtils - .createHBaseQualifierFilter(CompareOp.EQUAL, - EntityColumnPrefix.RELATES_TO)); - } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) { - // Even if fields to retrieve does not contain RELATES_TO, we still - // need to have a filter to fetch some of the column qualifiers if - // relatesTo filters are specified. relatesTo filters will then be - // matched after fetching rows from HBase. - Set<String> relatesToCols = - TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo); - infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( - EntityColumnPrefix.RELATES_TO, relatesToCols)); - } - TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo(); - if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { - // If IS_RELATED_TO field has to be retrieved, add a filter for fetching - // columns with IS_RELATED_TO column prefix. - infoFamilyColsFilter.addFilter(TimelineFilterUtils - .createHBaseQualifierFilter(CompareOp.EQUAL, - EntityColumnPrefix.IS_RELATED_TO)); - } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) { - // Even if fields to retrieve does not contain IS_RELATED_TO, we still - // need to have a filter to fetch some of the column qualifiers if - // isRelatedTo filters are specified. isRelatedTo filters will then be - // matched after fetching rows from HBase. - Set<String> isRelatedToCols = - TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo); - infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( - EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols)); - } - TimelineFilterList eventFilters = getFilters().getEventFilters(); - if (hasField(fieldsToRetrieve, Field.EVENTS)) { - // If EVENTS field has to be retrieved, add a filter for fetching columns - // with EVENT column prefix. - infoFamilyColsFilter - .addFilter(TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.EQUAL, EntityColumnPrefix.EVENT)); - } else if (eventFilters != null && - !eventFilters.getFilterList().isEmpty()) { - // Even if fields to retrieve does not contain EVENTS, we still need to - // have a filter to fetch some of the column qualifiers on the basis of - // event filters specified. Event filters will then be matched after - // fetching rows from HBase. - Set<String> eventCols = - TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters); - infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( - EntityColumnPrefix.EVENT, eventCols)); - } - return infoFamilyColsFilter; - } - - /** - * Exclude column prefixes via filters which are not required(based on fields - * to retrieve) from info column family. These filters are added to filter - * list which contains a filter for getting info column family. - * - * @param infoColFamilyList filter list for info column family. - */ - private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) { - EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); - // Events not required. - if (!hasField(fieldsToRetrieve, Field.EVENTS)) { - infoColFamilyList.addFilter(TimelineFilterUtils - .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, - EntityColumnPrefix.EVENT)); - } - // info not required. - if (!hasField(fieldsToRetrieve, Field.INFO)) { - infoColFamilyList.addFilter(TimelineFilterUtils - .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, - EntityColumnPrefix.INFO)); - } - // is related to not required. - if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { - infoColFamilyList.addFilter(TimelineFilterUtils - .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, - EntityColumnPrefix.IS_RELATED_TO)); - } - // relates to not required. - if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) { - infoColFamilyList.addFilter(TimelineFilterUtils - .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, - EntityColumnPrefix.RELATES_TO)); - } - } - - /** - * Updates filter list based on fields for confs and metrics to retrieve. - * - * @param listBasedOnFields filter list based on fields. - * @throws IOException if any problem occurs while updating filter list. - */ - private void updateFilterForConfsAndMetricsToRetrieve( - FilterList listBasedOnFields) throws IOException { - TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); - // Please note that if confsToRetrieve is specified, we would have added - // CONFS to fields to retrieve in augmentParams() even if not specified. - if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) { - // Create a filter list for configs. - listBasedOnFields.addFilter(TimelineFilterUtils - .createFilterForConfsOrMetricsToRetrieve( - dataToRetrieve.getConfsToRetrieve(), EntityColumnFamily.CONFIGS, - EntityColumnPrefix.CONFIG)); - } - - // Please note that if metricsToRetrieve is specified, we would have added - // METRICS to fields to retrieve in augmentParams() even if not specified. - if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) { - // Create a filter list for metrics. - listBasedOnFields.addFilter(TimelineFilterUtils - .createFilterForConfsOrMetricsToRetrieve( - dataToRetrieve.getMetricsToRetrieve(), - EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC)); - } - } - - @Override - protected FilterList constructFilterListBasedOnFields() throws IOException { - if (!needCreateFilterListBasedOnFields()) { - // Fetch all the columns. No need of a filter. - return null; - } - FilterList listBasedOnFields = new FilterList(Operator.MUST_PASS_ONE); - FilterList infoColFamilyList = new FilterList(); - // By default fetch everything in INFO column family. - FamilyFilter infoColumnFamily = - new FamilyFilter(CompareOp.EQUAL, new BinaryComparator( - EntityColumnFamily.INFO.getBytes())); - infoColFamilyList.addFilter(infoColumnFamily); - if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) { - // We can fetch only some of the columns from info family. - infoColFamilyList.addFilter(createFilterListForColsOfInfoFamily()); - } else { - // Exclude column prefixes in info column family which are not required - // based on fields to retrieve. - excludeFieldsFromInfoColFamily(infoColFamilyList); - } - listBasedOnFields.addFilter(infoColFamilyList); - updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields); - return listBasedOnFields; - } - - /** - * Looks up flow context from AppToFlow table. - * - * @param appToFlowRowKey to identify Cluster and App Ids. - * @param hbaseConf HBase configuration. - * @param conn HBase Connection. - * @return flow context information. - * @throws IOException if any problem occurs while fetching flow information. - */ - protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey, - Configuration hbaseConf, Connection conn) throws IOException { - byte[] rowKey = appToFlowRowKey.getRowKey(); - Get get = new Get(rowKey); - Result result = appToFlowTable.getResult(hbaseConf, conn, get); - if (result != null && !result.isEmpty()) { - return new FlowContext(AppToFlowColumn.USER_ID.readResult(result) - .toString(), AppToFlowColumn.FLOW_ID.readResult(result).toString(), - ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result)) - .longValue()); - } else { - throw new NotFoundException( - "Unable to find the context flow ID and flow run ID for clusterId=" - + appToFlowRowKey.getClusterId() + ", appId=" - + appToFlowRowKey.getAppId()); - } - } - - /** - * Encapsulates flow context information. - */ - protected static class FlowContext { - private final String userId; - private final String flowName; - private final Long flowRunId; - - public FlowContext(String user, String flowName, Long flowRunId) { - this.userId = user; - this.flowName = flowName; - this.flowRunId = flowRunId; - } - - protected String getUserId() { - return userId; - } - - protected String getFlowName() { - return flowName; - } - - protected Long getFlowRunId() { - return flowRunId; - } - } - - @Override - protected void validateParams() { - Preconditions.checkNotNull(getContext(), "context shouldn't be null"); - Preconditions.checkNotNull(getDataToRetrieve(), - "data to retrieve shouldn't be null"); - Preconditions.checkNotNull(getContext().getClusterId(), - "clusterId shouldn't be null"); - Preconditions.checkNotNull(getContext().getAppId(), - "appId shouldn't be null"); - Preconditions.checkNotNull(getContext().getEntityType(), - "entityType shouldn't be null"); - if (isSingleEntityRead()) { - Preconditions.checkNotNull(getContext().getEntityId(), - "entityId shouldn't be null"); - } - } - - @Override - protected void augmentParams(Configuration hbaseConf, Connection conn) - throws IOException { - TimelineReaderContext context = getContext(); - // In reality all three should be null or neither should be null - if (context.getFlowName() == null || context.getFlowRunId() == null - || context.getUserId() == null) { - // Get flow context information from AppToFlow table. - AppToFlowRowKey appToFlowRowKey = - new AppToFlowRowKey(context.getClusterId(), context.getAppId()); - FlowContext flowContext = - lookupFlowContext(appToFlowRowKey, hbaseConf, conn); - context.setFlowName(flowContext.flowName); - context.setFlowRunId(flowContext.flowRunId); - context.setUserId(flowContext.userId); - } - // Add configs/metrics to fields to retrieve if confsToRetrieve and/or - // metricsToRetrieve are specified. - getDataToRetrieve().addFieldsBasedOnConfsAndMetricsToRetrieve(); - if (!isSingleEntityRead()) { - createFiltersIfNull(); - } - } - - @Override - protected Result getResult(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException { - TimelineReaderContext context = getContext(); - byte[] rowKey = - new EntityRowKey(context.getClusterId(), context.getUserId(), - context.getFlowName(), context.getFlowRunId(), context.getAppId(), - context.getEntityType(), context.getEntityId()).getRowKey(); - Get get = new Get(rowKey); - get.setMaxVersions(getDataToRetrieve().getMetricsLimit()); - if (filterList != null && !filterList.getFilters().isEmpty()) { - get.setFilter(filterList); - } - return getTable().getResult(hbaseConf, conn, get); - } - - @Override - protected ResultScanner getResults(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException { - // Scan through part of the table to find the entities belong to one app - // and one type - Scan scan = new Scan(); - TimelineReaderContext context = getContext(); - RowKeyPrefix<EntityRowKey> entityRowKeyPrefix = - new EntityRowKeyPrefix(context.getClusterId(), context.getUserId(), - context.getFlowName(), context.getFlowRunId(), context.getAppId(), - context.getEntityType()); - scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix()); - scan.setMaxVersions(getDataToRetrieve().getMetricsLimit()); - if (filterList != null && !filterList.getFilters().isEmpty()) { - scan.setFilter(filterList); - } - return getTable().getResultScanner(hbaseConf, conn, scan); - } - - @Override - protected TimelineEntity parseEntity(Result result) throws IOException { - if (result == null || result.isEmpty()) { - return null; - } - TimelineEntity entity = new TimelineEntity(); - String entityType = EntityColumn.TYPE.readResult(result).toString(); - entity.setType(entityType); - String entityId = EntityColumn.ID.readResult(result).toString(); - entity.setId(entityId); - - TimelineEntityFilters filters = getFilters(); - // fetch created time - Long createdTime = (Long) EntityColumn.CREATED_TIME.readResult(result); - entity.setCreatedTime(createdTime); - - EnumSet<Field> fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); - // fetch is related to entities and match isRelatedTo filter. If isRelatedTo - // filters do not match, entity would be dropped. We have to match filters - // locally as relevant HBase filters to filter out rows on the basis of - // isRelatedTo are not set in HBase scan. - boolean checkIsRelatedTo = - !isSingleEntityRead() && filters.getIsRelatedTo() != null - && filters.getIsRelatedTo().getFilterList().size() > 0; - if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) { - readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true); - if (checkIsRelatedTo - && !TimelineStorageUtils.matchIsRelatedTo(entity, - filters.getIsRelatedTo())) { - return null; - } - if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { - entity.getIsRelatedToEntities().clear(); - } - } - - // fetch relates to entities and match relatesTo filter. If relatesTo - // filters do not match, entity would be dropped. We have to match filters - // locally as relevant HBase filters to filter out rows on the basis of - // relatesTo are not set in HBase scan. - boolean checkRelatesTo = - !isSingleEntityRead() && filters.getRelatesTo() != null - && filters.getRelatesTo().getFilterList().size() > 0; - if (hasField(fieldsToRetrieve, Field.RELATES_TO) - || checkRelatesTo) { - readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false); - if (checkRelatesTo - && !TimelineStorageUtils.matchRelatesTo(entity, - filters.getRelatesTo())) { - return null; - } - if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) { - entity.getRelatesToEntities().clear(); - } - } - - // fetch info if fieldsToRetrieve contains INFO or ALL. - if (hasField(fieldsToRetrieve, Field.INFO)) { - readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false); - } - - // fetch configs if fieldsToRetrieve contains CONFIGS or ALL. - if (hasField(fieldsToRetrieve, Field.CONFIGS)) { - readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true); - } - - // fetch events and match event filters if they exist. If event filters do - // not match, entity would be dropped. We have to match filters locally - // as relevant HBase filters to filter out rows on the basis of events - // are not set in HBase scan. - boolean checkEvents = - !isSingleEntityRead() && filters.getEventFilters() != null - && filters.getEventFilters().getFilterList().size() > 0; - if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) { - readEvents(entity, result, EntityColumnPrefix.EVENT); - if (checkEvents - && !TimelineStorageUtils.matchEventFilters(entity, - filters.getEventFilters())) { - return null; - } - if (!hasField(fieldsToRetrieve, Field.EVENTS)) { - entity.getEvents().clear(); - } - } - - // fetch metrics if fieldsToRetrieve contains METRICS or ALL. - if (hasField(fieldsToRetrieve, Field.METRICS)) { - readMetrics(entity, result, EntityColumnPrefix.METRIC); - } - return entity; - } - - /** - * Helper method for reading key-value pairs for either info or config. - * - * @param <T> Describes the type of column prefix. - * @param entity entity to fill. - * @param result result from HBase. - * @param prefix column prefix. - * @param isConfig if true, means we are reading configs, otherwise info. - * @throws IOException if any problem is encountered while reading result. - */ - protected <T> void readKeyValuePairs(TimelineEntity entity, Result result, - ColumnPrefix<T> prefix, boolean isConfig) throws IOException { - // info and configuration are of type Map<String, Object or String> - Map<String, Object> columns = - prefix.readResults(result, stringKeyConverter); - if (isConfig) { - for (Map.Entry<String, Object> column : columns.entrySet()) { - entity.addConfig(column.getKey(), column.getValue().toString()); - } - } else { - entity.addInfo(columns); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b01514f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java deleted file mode 100644 index 7b294a8..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java +++ /dev/null @@ -1,496 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.reader; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.NavigableMap; -import java.util.NavigableSet; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.FilterList.Operator; -import org.apache.hadoop.hbase.filter.QualifierFilter; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; - -/** - * The base class for reading and deserializing timeline entities from the - * HBase storage. Different types can be defined for different types of the - * entities that are being requested. - */ -public abstract class TimelineEntityReader { - private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class); - - private final boolean singleEntityRead; - private TimelineReaderContext context; - private TimelineDataToRetrieve dataToRetrieve; - // used only for multiple entity read mode - private TimelineEntityFilters filters; - - /** - * Main table the entity reader uses. - */ - private BaseTable<?> table; - - /** - * Specifies whether keys for this table are sorted in a manner where entities - * can be retrieved by created time. If true, it will be sufficient to collect - * the first results as specified by the limit. Otherwise all matched entities - * will be fetched and then limit applied. - */ - private boolean sortedKeys = false; - - /** - * Used to convert strings key components to and from storage format. - */ - private final KeyConverter<String> stringKeyConverter = - new StringKeyConverter(); - - /** - * Instantiates a reader for multiple-entity reads. - * - * @param ctxt Reader context which defines the scope in which query has to be - * made. - * @param entityFilters Filters which limit the entities returned. - * @param toRetrieve Data to retrieve for each entity. - * @param sortedKeys Specifies whether key for this table are sorted or not. - * If sorted, entities can be retrieved by created time. - */ - protected TimelineEntityReader(TimelineReaderContext ctxt, - TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve, - boolean sortedKeys) { - this.singleEntityRead = false; - this.sortedKeys = sortedKeys; - this.context = ctxt; - this.dataToRetrieve = toRetrieve; - this.filters = entityFilters; - - this.setTable(getTable()); - } - - /** - * Instantiates a reader for single-entity reads. - * - * @param ctxt Reader context which defines the scope in which query has to be - * made. - * @param toRetrieve Data to retrieve for each entity. - */ - protected TimelineEntityReader(TimelineReaderContext ctxt, - TimelineDataToRetrieve toRetrieve) { - this.singleEntityRead = true; - this.context = ctxt; - this.dataToRetrieve = toRetrieve; - - this.setTable(getTable()); - } - - /** - * Creates a {@link FilterList} based on fields, confs and metrics to - * retrieve. This filter list will be set in Scan/Get objects to trim down - * results fetched from HBase back-end storage. This is called only for - * multiple entity reads. - * - * @return a {@link FilterList} object. - * @throws IOException if any problem occurs while creating filter list. - */ - protected abstract FilterList constructFilterListBasedOnFields() - throws IOException; - - /** - * Creates a {@link FilterList} based on info, config and metric filters. This - * filter list will be set in HBase Get to trim down results fetched from - * HBase back-end storage. - * - * @return a {@link FilterList} object. - * @throws IOException if any problem occurs while creating filter list. - */ - protected abstract FilterList constructFilterListBasedOnFilters() - throws IOException; - - /** - * Combines filter lists created based on fields and based on filters. - * - * @return a {@link FilterList} object if it can be constructed. Returns null, - * if filter list cannot be created either on the basis of filters or on the - * basis of fields. - * @throws IOException if any problem occurs while creating filter list. - */ - private FilterList createFilterList() throws IOException { - FilterList listBasedOnFilters = constructFilterListBasedOnFilters(); - boolean hasListBasedOnFilters = listBasedOnFilters != null && - !listBasedOnFilters.getFilters().isEmpty(); - FilterList listBasedOnFields = constructFilterListBasedOnFields(); - boolean hasListBasedOnFields = listBasedOnFields != null && - !listBasedOnFields.getFilters().isEmpty(); - // If filter lists based on both filters and fields can be created, - // combine them in a new filter list and return it. - // If either one of them has been created, return that filter list. - // Return null, if none of the filter lists can be created. This indicates - // that no filter list needs to be added to HBase Scan as filters are not - // specified for the query or only the default view of entity needs to be - // returned. - if (hasListBasedOnFilters && hasListBasedOnFields) { - FilterList list = new FilterList(); - list.addFilter(listBasedOnFilters); - list.addFilter(listBasedOnFields); - return list; - } else if (hasListBasedOnFilters) { - return listBasedOnFilters; - } else if (hasListBasedOnFields) { - return listBasedOnFields; - } - return null; - } - - protected TimelineReaderContext getContext() { - return context; - } - - protected TimelineDataToRetrieve getDataToRetrieve() { - return dataToRetrieve; - } - - protected TimelineEntityFilters getFilters() { - return filters; - } - - /** - * Create a {@link TimelineEntityFilters} object with default values for - * filters. - */ - protected void createFiltersIfNull() { - if (filters == null) { - filters = new TimelineEntityFilters(); - } - } - - /** - * Reads and deserializes a single timeline entity from the HBase storage. - * - * @param hbaseConf HBase Configuration. - * @param conn HBase Connection. - * @return A <cite>TimelineEntity</cite> object. - * @throws IOException if there is any exception encountered while reading - * entity. - */ - public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) - throws IOException { - validateParams(); - augmentParams(hbaseConf, conn); - - FilterList filterList = constructFilterListBasedOnFields(); - if (LOG.isDebugEnabled() && filterList != null) { - LOG.debug("FilterList created for get is - " + filterList); - } - Result result = getResult(hbaseConf, conn, filterList); - if (result == null || result.isEmpty()) { - // Could not find a matching row. - LOG.info("Cannot find matching entity of type " + - context.getEntityType()); - return null; - } - return parseEntity(result); - } - - /** - * Reads and deserializes a set of timeline entities from the HBase storage. - * It goes through all the results available, and returns the number of - * entries as specified in the limit in the entity's natural sort order. - * - * @param hbaseConf HBase Configuration. - * @param conn HBase Connection. - * @return a set of <cite>TimelineEntity</cite> objects. - * @throws IOException if any exception is encountered while reading entities. - */ - public Set<TimelineEntity> readEntities(Configuration hbaseConf, - Connection conn) throws IOException { - validateParams(); - augmentParams(hbaseConf, conn); - - NavigableSet<TimelineEntity> entities = new TreeSet<>(); - FilterList filterList = createFilterList(); - if (LOG.isDebugEnabled() && filterList != null) { - LOG.debug("FilterList created for scan is - " + filterList); - } - ResultScanner results = getResults(hbaseConf, conn, filterList); - try { - for (Result result : results) { - TimelineEntity entity = parseEntity(result); - if (entity == null) { - continue; - } - entities.add(entity); - if (!sortedKeys) { - if (entities.size() > filters.getLimit()) { - entities.pollLast(); - } - } else { - if (entities.size() == filters.getLimit()) { - break; - } - } - } - return entities; - } finally { - results.close(); - } - } - - /** - * Returns the main table to be used by the entity reader. - * - * @return A reference to the table. - */ - protected BaseTable<?> getTable() { - return table; - } - - /** - * Validates the required parameters to read the entities. - */ - protected abstract void validateParams(); - - /** - * Sets certain parameters to defaults if the values are not provided. - * - * @param hbaseConf HBase Configuration. - * @param conn HBase Connection. - * @throws IOException if any exception is encountered while setting params. - */ - protected abstract void augmentParams(Configuration hbaseConf, - Connection conn) throws IOException; - - /** - * Fetches a {@link Result} instance for a single-entity read. - * - * @param hbaseConf HBase Configuration. - * @param conn HBase Connection. - * @param filterList filter list which will be applied to HBase Get. - * @return the {@link Result} instance or null if no such record is found. - * @throws IOException if any exception is encountered while getting result. - */ - protected abstract Result getResult(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException; - - /** - * Fetches a {@link ResultScanner} for a multi-entity read. - * - * @param hbaseConf HBase Configuration. - * @param conn HBase Connection. - * @param filterList filter list which will be applied to HBase Scan. - * @return the {@link ResultScanner} instance. - * @throws IOException if any exception is encountered while getting results. - */ - protected abstract ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException; - - /** - * Parses the result retrieved from HBase backend and convert it into a - * {@link TimelineEntity} object. - * - * @param result Single row result of a Get/Scan. - * @return the <cite>TimelineEntity</cite> instance or null if the entity is - * filtered. - * @throws IOException if any exception is encountered while parsing entity. - */ - protected abstract TimelineEntity parseEntity(Result result) - throws IOException; - - /** - * Helper method for reading and deserializing {@link TimelineMetric} objects - * using the specified column prefix. The timeline metrics then are added to - * the given timeline entity. - * - * @param entity {@link TimelineEntity} object. - * @param result {@link Result} object retrieved from backend. - * @param columnPrefix Metric column prefix - * @throws IOException if any exception is encountered while reading metrics. - */ - protected void readMetrics(TimelineEntity entity, Result result, - ColumnPrefix<?> columnPrefix) throws IOException { - NavigableMap<String, NavigableMap<Long, Number>> metricsResult = - columnPrefix.readResultsWithTimestamps( - result, stringKeyConverter); - for (Map.Entry<String, NavigableMap<Long, Number>> metricResult: - metricsResult.entrySet()) { - TimelineMetric metric = new TimelineMetric(); - metric.setId(metricResult.getKey()); - // Simply assume that if the value set contains more than 1 elements, the - // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric - TimelineMetric.Type metricType = metricResult.getValue().size() > 1 ? - TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE; - metric.setType(metricType); - metric.addValues(metricResult.getValue()); - entity.addMetric(metric); - } - } - - /** - * Checks whether the reader has been created to fetch single entity or - * multiple entities. - * - * @return true, if query is for single entity, false otherwise. - */ - public boolean isSingleEntityRead() { - return singleEntityRead; - } - - protected void setTable(BaseTable<?> baseTable) { - this.table = baseTable; - } - - /** - * Check if we have a certain field amongst fields to retrieve. This method - * checks against {@link Field#ALL} as well because that would mean field - * passed needs to be matched. - * - * @param fieldsToRetrieve fields to be retrieved. - * @param requiredField fields to be checked in fieldsToRetrieve. - * @return true if has the required field, false otherwise. - */ - protected boolean hasField(EnumSet<Field> fieldsToRetrieve, - Field requiredField) { - return fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(requiredField); - } - - /** - * Create a filter list of qualifier filters based on passed set of columns. - * - * @param <T> Describes the type of column prefix. - * @param colPrefix Column Prefix. - * @param columns set of column qualifiers. - * @return filter list. - */ - protected <T> FilterList createFiltersFromColumnQualifiers( - ColumnPrefix<T> colPrefix, Set<String> columns) { - FilterList list = new FilterList(Operator.MUST_PASS_ONE); - for (String column : columns) { - // For columns which have compound column qualifiers (eg. events), we need - // to include the required separator. - byte[] compoundColQual = createColQualifierPrefix(colPrefix, column); - list.addFilter(new QualifierFilter(CompareOp.EQUAL, - new BinaryPrefixComparator(colPrefix - .getColumnPrefixBytes(compoundColQual)))); - } - return list; - } - - protected <T> byte[] createColQualifierPrefix(ColumnPrefix<T> colPrefix, - String column) { - if (colPrefix == ApplicationColumnPrefix.EVENT - || colPrefix == EntityColumnPrefix.EVENT) { - return new EventColumnName(column, null, null).getColumnQualifier(); - } else { - return stringKeyConverter.encode(column); - } - } - - /** - * Helper method for reading relationship. - * - * @param <T> Describes the type of column prefix. - * @param entity entity to fill. - * @param result result from HBase. - * @param prefix column prefix. - * @param isRelatedTo if true, means relationship is to be added to - * isRelatedTo, otherwise its added to relatesTo. - * @throws IOException if any problem is encountered while reading result. - */ - protected <T> void readRelationship(TimelineEntity entity, Result result, - ColumnPrefix<T> prefix, boolean isRelatedTo) throws IOException { - // isRelatedTo and relatesTo are of type Map<String, Set<String>> - Map<String, Object> columns = - prefix.readResults(result, stringKeyConverter); - for (Map.Entry<String, Object> column : columns.entrySet()) { - for (String id : Separator.VALUES.splitEncoded(column.getValue() - .toString())) { - if (isRelatedTo) { - entity.addIsRelatedToEntity(column.getKey(), id); - } else { - entity.addRelatesToEntity(column.getKey(), id); - } - } - } - } - - /** - * Read events from the entity table or the application table. The column name - * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted - * if there is no info associated with the event. - * - * @param <T> Describes the type of column prefix. - * @param entity entity to fill. - * @param result HBase Result. - * @param prefix column prefix. - * @throws IOException if any problem is encountered while reading result. - */ - protected static <T> void readEvents(TimelineEntity entity, Result result, - ColumnPrefix<T> prefix) throws IOException { - Map<String, TimelineEvent> eventsMap = new HashMap<>(); - Map<EventColumnName, Object> eventsResult = - prefix.readResults(result, new EventColumnNameConverter()); - for (Map.Entry<EventColumnName, Object> - eventResult : eventsResult.entrySet()) { - EventColumnName eventColumnName = eventResult.getKey(); - String key = eventColumnName.getId() + - Long.toString(eventColumnName.getTimestamp()); - // Retrieve previously seen event to add to it - TimelineEvent event = eventsMap.get(key); - if (event == null) { - // First time we're seeing this event, add it to the eventsMap - event = new TimelineEvent(); - event.setId(eventColumnName.getId()); - event.setTimestamp(eventColumnName.getTimestamp()); - eventsMap.put(key, event); - } - if (eventColumnName.getInfoKey() != null) { - event.addInfo(eventColumnName.getInfoKey(), eventResult.getValue()); - } - } - Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values()); - entity.addEvents(eventsSet); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b01514f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java deleted file mode 100644 index b2a9476..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.reader; - -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; - -/** - * Factory methods for instantiating a timeline entity reader. - */ -public final class TimelineEntityReaderFactory { - private TimelineEntityReaderFactory() { - } - - /** - * Creates a timeline entity reader instance for reading a single entity with - * the specified input. - * - * @param context Reader context which defines the scope in which query has to - * be made. - * @param dataToRetrieve Data to retrieve for each entity. - * @return An implementation of <cite>TimelineEntityReader</cite> object - * depending on entity type. - */ - public static TimelineEntityReader createSingleEntityReader( - TimelineReaderContext context, TimelineDataToRetrieve dataToRetrieve) { - // currently the types that are handled separate from the generic entity - // table are application, flow run, and flow activity entities - if (TimelineEntityType.YARN_APPLICATION.matches(context.getEntityType())) { - return new ApplicationEntityReader(context, dataToRetrieve); - } else if (TimelineEntityType. - YARN_FLOW_RUN.matches(context.getEntityType())) { - return new FlowRunEntityReader(context, dataToRetrieve); - } else if (TimelineEntityType. - YARN_FLOW_ACTIVITY.matches(context.getEntityType())) { - return new FlowActivityEntityReader(context, dataToRetrieve); - } else { - // assume we're dealing with a generic entity read - return new GenericEntityReader(context, dataToRetrieve); - } - } - - /** - * Creates a timeline entity reader instance for reading set of entities with - * the specified input and predicates. - * - * @param context Reader context which defines the scope in which query has to - * be made. - * @param filters Filters which limit the entities returned. - * @param dataToRetrieve Data to retrieve for each entity. - * @return An implementation of <cite>TimelineEntityReader</cite> object - * depending on entity type. - */ - public static TimelineEntityReader createMultipleEntitiesReader( - TimelineReaderContext context, TimelineEntityFilters filters, - TimelineDataToRetrieve dataToRetrieve) { - // currently the types that are handled separate from the generic entity - // table are application, flow run, and flow activity entities - if (TimelineEntityType.YARN_APPLICATION.matches(context.getEntityType())) { - return new ApplicationEntityReader(context, filters, dataToRetrieve); - } else if (TimelineEntityType. - YARN_FLOW_ACTIVITY.matches(context.getEntityType())) { - return new FlowActivityEntityReader(context, filters, dataToRetrieve); - } else if (TimelineEntityType. - YARN_FLOW_RUN.matches(context.getEntityType())) { - return new FlowRunEntityReader(context, filters, dataToRetrieve); - } else { - // assume we're dealing with a generic entity read - return new GenericEntityReader(context, filters, dataToRetrieve, false); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b01514f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java deleted file mode 100644 index 9814d6d..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Package org.apache.hadoop.yarn.server.timelineservice.storage.reader - * contains classes used to read entities from backend based on query type. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -package org.apache.hadoop.yarn.server.timelineservice.storage.reader; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b01514f6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java deleted file mode 100644 index 58df970..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage.common; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.junit.Test; - -public class TestKeyConverters { - - @Test - public void testAppIdKeyConverter() { - AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter(); - long currentTs = System.currentTimeMillis(); - ApplicationId appId1 = ApplicationId.newInstance(currentTs, 1); - ApplicationId appId2 = ApplicationId.newInstance(currentTs, 2); - ApplicationId appId3 = ApplicationId.newInstance(currentTs + 300, 1); - String appIdStr1 = appId1.toString(); - String appIdStr2 = appId2.toString(); - String appIdStr3 = appId3.toString(); - byte[] appIdBytes1 = appIdKeyConverter.encode(appIdStr1); - byte[] appIdBytes2 = appIdKeyConverter.encode(appIdStr2); - byte[] appIdBytes3 = appIdKeyConverter.encode(appIdStr3); - // App ids' should be encoded in a manner wherein descending order - // is maintained. - assertTrue( - "Ordering of app ids' is incorrect", - Bytes.compareTo(appIdBytes1, appIdBytes2) > 0 - && Bytes.compareTo(appIdBytes1, appIdBytes3) > 0 - && Bytes.compareTo(appIdBytes2, appIdBytes3) > 0); - String decodedAppId1 = appIdKeyConverter.decode(appIdBytes1); - String decodedAppId2 = appIdKeyConverter.decode(appIdBytes2); - String decodedAppId3 = appIdKeyConverter.decode(appIdBytes3); - assertTrue("Decoded app id is not same as the app id encoded", - appIdStr1.equals(decodedAppId1)); - assertTrue("Decoded app id is not same as the app id encoded", - appIdStr2.equals(decodedAppId2)); - assertTrue("Decoded app id is not same as the app id encoded", - appIdStr3.equals(decodedAppId3)); - } - - @Test - public void testEventColumnNameConverter() { - String eventId = "=foo_=eve=nt="; - byte[] valSepBytes = Bytes.toBytes(Separator.VALUES.getValue()); - byte[] maxByteArr = - Bytes.createMaxByteArray(Bytes.SIZEOF_LONG - valSepBytes.length); - byte[] ts = Bytes.add(valSepBytes, maxByteArr); - Long eventTs = Bytes.toLong(ts); - byte[] byteEventColName = - new EventColumnName(eventId, eventTs, null).getColumnQualifier(); - KeyConverter<EventColumnName> eventColumnNameConverter = - new EventColumnNameConverter(); - EventColumnName eventColName = - eventColumnNameConverter.decode(byteEventColName); - assertEquals(eventId, eventColName.getId()); - assertEquals(eventTs, eventColName.getTimestamp()); - assertNull(eventColName.getInfoKey()); - - String infoKey = "f=oo_event_in=fo=_key"; - byteEventColName = - new EventColumnName(eventId, eventTs, infoKey).getColumnQualifier(); - eventColName = eventColumnNameConverter.decode(byteEventColName); - assertEquals(eventId, eventColName.getId()); - assertEquals(eventTs, eventColName.getTimestamp()); - assertEquals(infoKey, eventColName.getInfoKey()); - } - - @Test - public void testLongKeyConverter() { - LongKeyConverter longKeyConverter = new LongKeyConverter(); - confirmLongKeyConverter(longKeyConverter, Long.MIN_VALUE); - confirmLongKeyConverter(longKeyConverter, -1234567890L); - confirmLongKeyConverter(longKeyConverter, -128L); - confirmLongKeyConverter(longKeyConverter, -127L); - confirmLongKeyConverter(longKeyConverter, -1L); - confirmLongKeyConverter(longKeyConverter, 0L); - confirmLongKeyConverter(longKeyConverter, 1L); - confirmLongKeyConverter(longKeyConverter, 127L); - confirmLongKeyConverter(longKeyConverter, 128L); - confirmLongKeyConverter(longKeyConverter, 1234567890L); - confirmLongKeyConverter(longKeyConverter, Long.MAX_VALUE); - } - - private void confirmLongKeyConverter(LongKeyConverter longKeyConverter, - Long testValue) { - Long decoded = longKeyConverter.decode(longKeyConverter.encode(testValue)); - assertEquals(testValue, decoded); - } - - @Test - public void testStringKeyConverter() { - StringKeyConverter stringKeyConverter = new StringKeyConverter(); - String phrase = "QuackAttack now!"; - - for (int i = 0; i < phrase.length(); i++) { - String sub = phrase.substring(i, phrase.length()); - confirmStrignKeyConverter(stringKeyConverter, sub); - confirmStrignKeyConverter(stringKeyConverter, sub + sub); - } - } - - private void confirmStrignKeyConverter(StringKeyConverter stringKeyConverter, - String testValue) { - String decoded = - stringKeyConverter.decode(stringKeyConverter.encode(testValue)); - assertEquals(testValue, decoded); - } - -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org