http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java new file mode 100644 index 0000000..2b98eec --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Set of utility methods used by timeline filter classes. + */ +public final class TimelineFilterUtils { + + private static final Logger LOG = + LoggerFactory.getLogger(TimelineFilterUtils.class); + + private TimelineFilterUtils() { + } + + /** + * Returns the equivalent HBase filter list's {@link Operator}. + * + * @param op timeline filter list operator. + * @return HBase filter list's Operator. + */ + private static Operator getHBaseOperator(TimelineFilterList.Operator op) { + switch (op) { + case AND: + return Operator.MUST_PASS_ALL; + case OR: + return Operator.MUST_PASS_ONE; + default: + throw new IllegalArgumentException("Invalid operator"); + } + } + + /** + * Returns the equivalent HBase compare filter's {@link CompareOp}. + * + * @param op timeline compare op. + * @return HBase compare filter's CompareOp. + */ + private static CompareOp getHBaseCompareOp( + TimelineCompareOp op) { + switch (op) { + case LESS_THAN: + return CompareOp.LESS; + case LESS_OR_EQUAL: + return CompareOp.LESS_OR_EQUAL; + case EQUAL: + return CompareOp.EQUAL; + case NOT_EQUAL: + return CompareOp.NOT_EQUAL; + case GREATER_OR_EQUAL: + return CompareOp.GREATER_OR_EQUAL; + case GREATER_THAN: + return CompareOp.GREATER; + default: + throw new IllegalArgumentException("Invalid compare operator"); + } + } + + /** + * Converts a {@link TimelinePrefixFilter} to an equivalent HBase + * {@link QualifierFilter}. + * @param colPrefix + * @param filter + * @return a {@link QualifierFilter} object + */ + private static <T extends BaseTable<T>> Filter createHBaseColQualPrefixFilter( + ColumnPrefix<T> colPrefix, TimelinePrefixFilter filter) { + return new QualifierFilter(getHBaseCompareOp(filter.getCompareOp()), + new BinaryPrefixComparator( + colPrefix.getColumnPrefixBytes(filter.getPrefix()))); + } + + /** + * Create a HBase {@link QualifierFilter} for the passed column prefix and + * compare op. + * + * @param <T> Describes the type of column prefix. + * @param compareOp compare op. + * @param columnPrefix column prefix. + * @return a column qualifier filter. + */ + public static <T extends BaseTable<T>> Filter createHBaseQualifierFilter( + CompareOp compareOp, ColumnPrefix<T> columnPrefix) { + return new QualifierFilter(compareOp, + new BinaryPrefixComparator( + columnPrefix.getColumnPrefixBytes(""))); + } + + /** + * Create filters for confs or metrics to retrieve. This list includes a + * configs/metrics family filter and relevant filters for confs/metrics to + * retrieve, if present. + * + * @param <T> Describes the type of column prefix. + * @param confsOrMetricToRetrieve configs/metrics to retrieve. + * @param columnFamily config or metric column family. + * @param columnPrefix config or metric column prefix. + * @return a filter list. + * @throws IOException if any problem occurs while creating the filters. + */ + public static <T extends BaseTable<T>> Filter + createFilterForConfsOrMetricsToRetrieve( + TimelineFilterList confsOrMetricToRetrieve, ColumnFamily<T> columnFamily, + ColumnPrefix<T> columnPrefix) throws IOException { + Filter familyFilter = new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(columnFamily.getBytes())); + if (confsOrMetricToRetrieve != null && + !confsOrMetricToRetrieve.getFilterList().isEmpty()) { + // If confsOrMetricsToRetrive are specified, create a filter list based + // on it and family filter. + FilterList filter = new FilterList(familyFilter); + filter.addFilter( + createHBaseFilterList(columnPrefix, confsOrMetricToRetrieve)); + return filter; + } else { + // Only the family filter needs to be added. + return familyFilter; + } + } + + /** + * Create 2 HBase {@link SingleColumnValueFilter} filters for the specified + * value range represented by start and end value and wraps them inside a + * filter list. Start and end value should not be null. + * + * @param <T> Describes the type of column prefix. + * @param column Column for which single column value filter is to be created. + * @param startValue Start value. + * @param endValue End value. + * @return 2 single column value filters wrapped in a filter list. + * @throws IOException if any problem is encountered while encoding value. + */ + public static <T extends BaseTable<T>> FilterList + createSingleColValueFiltersByRange(Column<T> column, + Object startValue, Object endValue) throws IOException { + FilterList list = new FilterList(); + Filter singleColValFilterStart = createHBaseSingleColValueFilter( + column.getColumnFamilyBytes(), column.getColumnQualifierBytes(), + column.getValueConverter().encodeValue(startValue), + CompareOp.GREATER_OR_EQUAL, true); + list.addFilter(singleColValFilterStart); + + Filter singleColValFilterEnd = createHBaseSingleColValueFilter( + column.getColumnFamilyBytes(), column.getColumnQualifierBytes(), + column.getValueConverter().encodeValue(endValue), + CompareOp.LESS_OR_EQUAL, true); + list.addFilter(singleColValFilterEnd); + return list; + } + + /** + * Creates a HBase {@link SingleColumnValueFilter} with specified column. + * @param <T> Describes the type of column prefix. + * @param column Column which value to be filtered. + * @param value Value to be filtered. + * @param op Compare operator + * @return a SingleColumnValue Filter + * @throws IOException if any exception. + */ + public static <T extends BaseTable<T>> Filter + createHBaseSingleColValueFilter(Column<T> column, + Object value, CompareOp op) throws IOException { + Filter singleColValFilter = createHBaseSingleColValueFilter( + column.getColumnFamilyBytes(), column.getColumnQualifierBytes(), + column.getValueConverter().encodeValue(value), op, true); + return singleColValFilter; + } + + /** + * Creates a HBase {@link SingleColumnValueFilter}. + * + * @param columnFamily Column Family represented as bytes. + * @param columnQualifier Column Qualifier represented as bytes. + * @param value Value. + * @param compareOp Compare operator. + * @param filterIfMissing This flag decides if we should filter the row if the + * specified column is missing. This is based on the filter's keyMustExist + * field. + * @return a {@link SingleColumnValueFilter} object + * @throws IOException + */ + private static SingleColumnValueFilter createHBaseSingleColValueFilter( + byte[] columnFamily, byte[] columnQualifier, byte[] value, + CompareOp compareOp, boolean filterIfMissing) throws IOException { + SingleColumnValueFilter singleColValFilter = + new SingleColumnValueFilter(columnFamily, columnQualifier, compareOp, + new BinaryComparator(value)); + singleColValFilter.setLatestVersionOnly(true); + singleColValFilter.setFilterIfMissing(filterIfMissing); + return singleColValFilter; + } + + /** + * Fetch columns from filter list containing exists and multivalue equality + * filters. This is done to fetch only required columns from back-end and + * then match event filters or relationships in reader. + * + * @param filterList filter list. + * @return set of columns. + */ + public static Set<String> fetchColumnsFromFilterList( + TimelineFilterList filterList) { + Set<String> strSet = new HashSet<String>(); + for (TimelineFilter filter : filterList.getFilterList()) { + switch(filter.getFilterType()) { + case LIST: + strSet.addAll(fetchColumnsFromFilterList((TimelineFilterList)filter)); + break; + case KEY_VALUES: + strSet.add(((TimelineKeyValuesFilter)filter).getKey()); + break; + case EXISTS: + strSet.add(((TimelineExistsFilter)filter).getValue()); + break; + default: + LOG.info("Unexpected filter type " + filter.getFilterType()); + break; + } + } + return strSet; + } + + /** + * Creates equivalent HBase {@link FilterList} from {@link TimelineFilterList} + * while converting different timeline filters(of type {@link TimelineFilter}) + * into their equivalent HBase filters. + * + * @param <T> Describes the type of column prefix. + * @param colPrefix column prefix which will be used for conversion. + * @param filterList timeline filter list which has to be converted. + * @return A {@link FilterList} object. + * @throws IOException if any problem occurs while creating the filter list. + */ + public static <T extends BaseTable<T>> FilterList createHBaseFilterList( + ColumnPrefix<T> colPrefix, + TimelineFilterList filterList) throws IOException { + FilterList list = + new FilterList(getHBaseOperator(filterList.getOperator())); + for (TimelineFilter filter : filterList.getFilterList()) { + switch(filter.getFilterType()) { + case LIST: + list.addFilter(createHBaseFilterList(colPrefix, + (TimelineFilterList)filter)); + break; + case PREFIX: + list.addFilter(createHBaseColQualPrefixFilter(colPrefix, + (TimelinePrefixFilter)filter)); + break; + case COMPARE: + TimelineCompareFilter compareFilter = (TimelineCompareFilter)filter; + list.addFilter( + createHBaseSingleColValueFilter( + colPrefix.getColumnFamilyBytes(), + colPrefix.getColumnPrefixBytes(compareFilter.getKey()), + colPrefix.getValueConverter(). + encodeValue(compareFilter.getValue()), + getHBaseCompareOp(compareFilter.getCompareOp()), + compareFilter.getKeyMustExist())); + break; + case KEY_VALUE: + TimelineKeyValueFilter kvFilter = (TimelineKeyValueFilter)filter; + list.addFilter( + createHBaseSingleColValueFilter( + colPrefix.getColumnFamilyBytes(), + colPrefix.getColumnPrefixBytes(kvFilter.getKey()), + colPrefix.getValueConverter().encodeValue(kvFilter.getValue()), + getHBaseCompareOp(kvFilter.getCompareOp()), + kvFilter.getKeyMustExist())); + break; + default: + LOG.info("Unexpected filter type " + filter.getFilterType()); + break; + } + } + return list; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java new file mode 100644 index 0000000..f7c0705 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.server.timelineservice.reader.filter stores + * timeline filter implementations. + */ [email protected] [email protected] +package org.apache.hadoop.yarn.server.timelineservice.reader.filter; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java new file mode 100644 index 0000000..1ebfab2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + + +import java.io.IOException; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.reader.EntityTypeReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * HBase based implementation for {@link TimelineReader}. + */ +public class HBaseTimelineReaderImpl + extends AbstractService implements TimelineReader { + + private static final Logger LOG = LoggerFactory + .getLogger(HBaseTimelineReaderImpl.class); + + private Configuration hbaseConf = null; + private Connection conn; + + public HBaseTimelineReaderImpl() { + super(HBaseTimelineReaderImpl.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf); + conn = ConnectionFactory.createConnection(hbaseConf); + } + + @Override + protected void serviceStop() throws Exception { + if (conn != null) { + LOG.info("closing the hbase Connection"); + conn.close(); + } + super.serviceStop(); + } + + @Override + public TimelineEntity getEntity(TimelineReaderContext context, + TimelineDataToRetrieve dataToRetrieve) throws IOException { + TimelineEntityReader reader = + TimelineEntityReaderFactory.createSingleEntityReader(context, + dataToRetrieve); + return reader.readEntity(hbaseConf, conn); + } + + @Override + public Set<TimelineEntity> getEntities(TimelineReaderContext context, + TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) + throws IOException { + TimelineEntityReader reader = + TimelineEntityReaderFactory.createMultipleEntitiesReader(context, + filters, dataToRetrieve); + return reader.readEntities(hbaseConf, conn); + } + + @Override + public Set<String> getEntityTypes(TimelineReaderContext context) + throws IOException { + EntityTypeReader reader = new EntityTypeReader(context); + return reader.readEntityTypes(hbaseConf, conn); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java new file mode 100644 index 0000000..027505b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -0,0 +1,611 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnRWHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTableRW; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This implements a hbase based backend for storing the timeline entity + * information. + * It writes to multiple tables at the backend + */ [email protected] [email protected] +public class HBaseTimelineWriterImpl extends AbstractService implements + TimelineWriter { + + private static final Logger LOG = LoggerFactory + .getLogger(HBaseTimelineWriterImpl.class); + + private Connection conn; + private TypedBufferedMutator<EntityTable> entityTable; + private TypedBufferedMutator<AppToFlowTable> appToFlowTable; + private TypedBufferedMutator<ApplicationTable> applicationTable; + private TypedBufferedMutator<FlowActivityTable> flowActivityTable; + private TypedBufferedMutator<FlowRunTable> flowRunTable; + private TypedBufferedMutator<SubApplicationTable> subApplicationTable; + + /** + * Used to convert strings key components to and from storage format. + */ + private final KeyConverter<String> stringKeyConverter = + new StringKeyConverter(); + + /** + * Used to convert Long key components to and from storage format. + */ + private final KeyConverter<Long> longKeyConverter = new LongKeyConverter(); + + private enum Tables { + APPLICATION_TABLE, ENTITY_TABLE, SUBAPPLICATION_TABLE + }; + + public HBaseTimelineWriterImpl() { + super(HBaseTimelineWriterImpl.class.getName()); + } + + /** + * initializes the hbase connection to write to the entity table. + */ + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + Configuration hbaseConf = + HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf); + conn = ConnectionFactory.createConnection(hbaseConf); + entityTable = new EntityTableRW().getTableMutator(hbaseConf, conn); + appToFlowTable = new AppToFlowTableRW().getTableMutator(hbaseConf, conn); + applicationTable = + new ApplicationTableRW().getTableMutator(hbaseConf, conn); + flowRunTable = new FlowRunTableRW().getTableMutator(hbaseConf, conn); + flowActivityTable = + new FlowActivityTableRW().getTableMutator(hbaseConf, conn); + subApplicationTable = + new SubApplicationTableRW().getTableMutator(hbaseConf, conn); + + UserGroupInformation ugi = UserGroupInformation.isSecurityEnabled() ? + UserGroupInformation.getLoginUser() : + UserGroupInformation.getCurrentUser(); + LOG.info("Initialized HBaseTimelineWriterImpl UGI to " + ugi); + } + + /** + * Stores the entire information in TimelineEntities to the timeline store. + */ + @Override + public TimelineWriteResponse write(TimelineCollectorContext context, + TimelineEntities data, UserGroupInformation callerUgi) + throws IOException { + + TimelineWriteResponse putStatus = new TimelineWriteResponse(); + + String clusterId = context.getClusterId(); + String userId = context.getUserId(); + String flowName = context.getFlowName(); + String flowVersion = context.getFlowVersion(); + long flowRunId = context.getFlowRunId(); + String appId = context.getAppId(); + String subApplicationUser = callerUgi.getShortUserName(); + + // defensive coding to avoid NPE during row key construction + if ((flowName == null) || (appId == null) || (clusterId == null) + || (userId == null)) { + LOG.warn("Found null for one of: flowName=" + flowName + " appId=" + appId + + " userId=" + userId + " clusterId=" + clusterId + + " . Not proceeding with writing to hbase"); + return putStatus; + } + + for (TimelineEntity te : data.getEntities()) { + + // a set can have at most 1 null + if (te == null) { + continue; + } + + // if the entity is the application, the destination is the application + // table + boolean isApplication = ApplicationEntity.isApplicationEntity(te); + byte[] rowKey; + if (isApplication) { + ApplicationRowKey applicationRowKey = + new ApplicationRowKey(clusterId, userId, flowName, flowRunId, + appId); + rowKey = applicationRowKey.getRowKey(); + store(rowKey, te, flowVersion, Tables.APPLICATION_TABLE); + } else { + EntityRowKey entityRowKey = + new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, + te.getType(), te.getIdPrefix(), te.getId()); + rowKey = entityRowKey.getRowKey(); + store(rowKey, te, flowVersion, Tables.ENTITY_TABLE); + } + + if (!isApplication && !userId.equals(subApplicationUser)) { + SubApplicationRowKey subApplicationRowKey = + new SubApplicationRowKey(subApplicationUser, clusterId, + te.getType(), te.getIdPrefix(), te.getId(), userId); + rowKey = subApplicationRowKey.getRowKey(); + store(rowKey, te, flowVersion, Tables.SUBAPPLICATION_TABLE); + } + + if (isApplication) { + TimelineEvent event = + ApplicationEntity.getApplicationEvent(te, + ApplicationMetricsConstants.CREATED_EVENT_TYPE); + FlowRunRowKey flowRunRowKey = + new FlowRunRowKey(clusterId, userId, flowName, flowRunId); + if (event != null) { + onApplicationCreated(flowRunRowKey, clusterId, appId, userId, + flowVersion, te, event.getTimestamp()); + } + // if it's an application entity, store metrics + storeFlowMetricsAppRunning(flowRunRowKey, appId, te); + // if application has finished, store it's finish time and write final + // values of all metrics + event = ApplicationEntity.getApplicationEvent(te, + ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + if (event != null) { + onApplicationFinished(flowRunRowKey, flowVersion, appId, te, + event.getTimestamp()); + } + } + } + return putStatus; + } + + private void onApplicationCreated(FlowRunRowKey flowRunRowKey, + String clusterId, String appId, String userId, String flowVersion, + TimelineEntity te, long appCreatedTimeStamp) + throws IOException { + + String flowName = flowRunRowKey.getFlowName(); + Long flowRunId = flowRunRowKey.getFlowRunId(); + + // store in App to flow table + AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(appId); + byte[] rowKey = appToFlowRowKey.getRowKey(); + ColumnRWHelper.store(rowKey, appToFlowTable, + AppToFlowColumnPrefix.FLOW_NAME, clusterId, null, flowName); + ColumnRWHelper.store(rowKey, appToFlowTable, + AppToFlowColumnPrefix.FLOW_RUN_ID, clusterId, null, flowRunId); + ColumnRWHelper.store(rowKey, appToFlowTable, AppToFlowColumnPrefix.USER_ID, + clusterId, null, userId); + + // store in flow run table + storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te); + + // store in flow activity table + byte[] flowActivityRowKeyBytes = + new FlowActivityRowKey(flowRunRowKey.getClusterId(), + appCreatedTimeStamp, flowRunRowKey.getUserId(), flowName) + .getRowKey(); + byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId()); + ColumnRWHelper.store(flowActivityRowKeyBytes, flowActivityTable, + FlowActivityColumnPrefix.RUN_ID, qualifier, null, flowVersion, + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); + } + + /* + * updates the {@link FlowRunTable} with Application Created information + */ + private void storeAppCreatedInFlowRunTable(FlowRunRowKey flowRunRowKey, + String appId, TimelineEntity te) throws IOException { + byte[] rowKey = flowRunRowKey.getRowKey(); + ColumnRWHelper.store(rowKey, flowRunTable, FlowRunColumn.MIN_START_TIME, + null, te.getCreatedTime(), + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); + } + + + /* + * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an + * application has finished + */ + private void onApplicationFinished(FlowRunRowKey flowRunRowKey, + String flowVersion, String appId, TimelineEntity te, + long appFinishedTimeStamp) throws IOException { + // store in flow run table + storeAppFinishedInFlowRunTable(flowRunRowKey, appId, te, + appFinishedTimeStamp); + + // indicate in the flow activity table that the app has finished + byte[] rowKey = + new FlowActivityRowKey(flowRunRowKey.getClusterId(), + appFinishedTimeStamp, flowRunRowKey.getUserId(), + flowRunRowKey.getFlowName()).getRowKey(); + byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId()); + ColumnRWHelper.store(rowKey, flowActivityTable, + FlowActivityColumnPrefix.RUN_ID, qualifier, null, flowVersion, + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); + } + + /* + * Update the {@link FlowRunTable} with Application Finished information + */ + private void storeAppFinishedInFlowRunTable(FlowRunRowKey flowRunRowKey, + String appId, TimelineEntity te, long appFinishedTimeStamp) + throws IOException { + byte[] rowKey = flowRunRowKey.getRowKey(); + Attribute attributeAppId = + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId); + ColumnRWHelper.store(rowKey, flowRunTable, FlowRunColumn.MAX_END_TIME, + null, appFinishedTimeStamp, attributeAppId); + + // store the final value of metrics since application has finished + Set<TimelineMetric> metrics = te.getMetrics(); + if (metrics != null) { + storeFlowMetrics(rowKey, metrics, attributeAppId, + AggregationOperation.SUM_FINAL.getAttribute()); + } + } + + /* + * Updates the {@link FlowRunTable} with Application Metrics + */ + private void storeFlowMetricsAppRunning(FlowRunRowKey flowRunRowKey, + String appId, TimelineEntity te) throws IOException { + Set<TimelineMetric> metrics = te.getMetrics(); + if (metrics != null) { + byte[] rowKey = flowRunRowKey.getRowKey(); + storeFlowMetrics(rowKey, metrics, + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId), + AggregationOperation.SUM.getAttribute()); + } + } + + private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics, + Attribute... attributes) throws IOException { + for (TimelineMetric metric : metrics) { + byte[] metricColumnQualifier = stringKeyConverter.encode(metric.getId()); + Map<Long, Number> timeseries = metric.getValues(); + for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) { + Long timestamp = timeseriesEntry.getKey(); + ColumnRWHelper.store(rowKey, flowRunTable, FlowRunColumnPrefix.METRIC, + metricColumnQualifier, timestamp, timeseriesEntry.getValue(), + attributes); + } + } + } + + /** + * Stores the Relations from the {@linkplain TimelineEntity} object. + */ + private <T extends BaseTable<T>> void storeRelations(byte[] rowKey, + Map<String, Set<String>> connectedEntities, ColumnPrefix<T> columnPrefix, + TypedBufferedMutator<T> table) throws IOException { + if (connectedEntities != null) { + for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities + .entrySet()) { + // id3?id4?id5 + String compoundValue = + Separator.VALUES.joinEncoded(connectedEntity.getValue()); + ColumnRWHelper.store(rowKey, table, columnPrefix, + stringKeyConverter.encode(connectedEntity.getKey()), + null, compoundValue); + } + } + } + + /** + * Stores information from the {@linkplain TimelineEntity} object. + */ + private void store(byte[] rowKey, TimelineEntity te, + String flowVersion, + Tables table) throws IOException { + switch (table) { + case APPLICATION_TABLE: + ColumnRWHelper.store(rowKey, applicationTable, + ApplicationColumn.ID, null, te.getId()); + ColumnRWHelper.store(rowKey, applicationTable, + ApplicationColumn.CREATED_TIME, null, te.getCreatedTime()); + ColumnRWHelper.store(rowKey, applicationTable, + ApplicationColumn.FLOW_VERSION, null, flowVersion); + storeInfo(rowKey, te.getInfo(), flowVersion, ApplicationColumnPrefix.INFO, + applicationTable); + storeMetrics(rowKey, te.getMetrics(), ApplicationColumnPrefix.METRIC, + applicationTable); + storeEvents(rowKey, te.getEvents(), ApplicationColumnPrefix.EVENT, + applicationTable); + storeConfig(rowKey, te.getConfigs(), ApplicationColumnPrefix.CONFIG, + applicationTable); + storeRelations(rowKey, te.getIsRelatedToEntities(), + ApplicationColumnPrefix.IS_RELATED_TO, applicationTable); + storeRelations(rowKey, te.getRelatesToEntities(), + ApplicationColumnPrefix.RELATES_TO, applicationTable); + break; + case ENTITY_TABLE: + ColumnRWHelper.store(rowKey, entityTable, + EntityColumn.ID, null, te.getId()); + ColumnRWHelper.store(rowKey, entityTable, + EntityColumn.TYPE, null, te.getType()); + ColumnRWHelper.store(rowKey, entityTable, + EntityColumn.CREATED_TIME, null, te.getCreatedTime()); + ColumnRWHelper.store(rowKey, entityTable, + EntityColumn.FLOW_VERSION, null, flowVersion); + storeInfo(rowKey, te.getInfo(), flowVersion, EntityColumnPrefix.INFO, + entityTable); + storeMetrics(rowKey, te.getMetrics(), EntityColumnPrefix.METRIC, + entityTable); + storeEvents(rowKey, te.getEvents(), EntityColumnPrefix.EVENT, + entityTable); + storeConfig(rowKey, te.getConfigs(), EntityColumnPrefix.CONFIG, + entityTable); + storeRelations(rowKey, te.getIsRelatedToEntities(), + EntityColumnPrefix.IS_RELATED_TO, entityTable); + storeRelations(rowKey, te.getRelatesToEntities(), + EntityColumnPrefix.RELATES_TO, entityTable); + break; + case SUBAPPLICATION_TABLE: + ColumnRWHelper.store(rowKey, subApplicationTable, SubApplicationColumn.ID, + null, te.getId()); + ColumnRWHelper.store(rowKey, subApplicationTable, + SubApplicationColumn.TYPE, null, te.getType()); + ColumnRWHelper.store(rowKey, subApplicationTable, + SubApplicationColumn.CREATED_TIME, null, te.getCreatedTime()); + ColumnRWHelper.store(rowKey, subApplicationTable, + SubApplicationColumn.FLOW_VERSION, null, flowVersion); + storeInfo(rowKey, te.getInfo(), flowVersion, + SubApplicationColumnPrefix.INFO, subApplicationTable); + storeMetrics(rowKey, te.getMetrics(), SubApplicationColumnPrefix.METRIC, + subApplicationTable); + storeEvents(rowKey, te.getEvents(), SubApplicationColumnPrefix.EVENT, + subApplicationTable); + storeConfig(rowKey, te.getConfigs(), SubApplicationColumnPrefix.CONFIG, + subApplicationTable); + storeRelations(rowKey, te.getIsRelatedToEntities(), + SubApplicationColumnPrefix.IS_RELATED_TO, subApplicationTable); + storeRelations(rowKey, te.getRelatesToEntities(), + SubApplicationColumnPrefix.RELATES_TO, subApplicationTable); + break; + default: + LOG.info("Invalid table name provided."); + break; + } + } + + /** + * stores the info information from {@linkplain TimelineEntity}. + */ + private <T extends BaseTable<T>> void storeInfo(byte[] rowKey, + Map<String, Object> info, String flowVersion, + ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T > table) + throws IOException { + if (info != null) { + for (Map.Entry<String, Object> entry : info.entrySet()) { + ColumnRWHelper.store(rowKey, table, columnPrefix, + stringKeyConverter.encode(entry.getKey()), null, entry.getValue()); + } + } + } + + /** + * stores the config information from {@linkplain TimelineEntity}. + */ + private <T extends BaseTable<T>> void storeConfig( + byte[] rowKey, Map<String, String> config, + ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table) + throws IOException { + if (config != null) { + for (Map.Entry<String, String> entry : config.entrySet()) { + byte[] configKey = stringKeyConverter.encode(entry.getKey()); + ColumnRWHelper.store(rowKey, table, columnPrefix, configKey, + null, entry.getValue()); + } + } + } + + /** + * stores the {@linkplain TimelineMetric} information from the + * {@linkplain TimelineEvent} object. + */ + private <T extends BaseTable<T>> void storeMetrics( + byte[] rowKey, Set<TimelineMetric> metrics, + ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table) + throws IOException { + if (metrics != null) { + for (TimelineMetric metric : metrics) { + byte[] metricColumnQualifier = + stringKeyConverter.encode(metric.getId()); + Map<Long, Number> timeseries = metric.getValues(); + for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) { + Long timestamp = timeseriesEntry.getKey(); + ColumnRWHelper.store(rowKey, table, columnPrefix, + metricColumnQualifier, timestamp, timeseriesEntry.getValue()); + } + } + } + } + + /** + * Stores the events from the {@linkplain TimelineEvent} object. + */ + private <T extends BaseTable<T>> void storeEvents( + byte[] rowKey, Set<TimelineEvent> events, + ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table) + throws IOException { + if (events != null) { + for (TimelineEvent event : events) { + if (event != null) { + String eventId = event.getId(); + if (eventId != null) { + long eventTimestamp = event.getTimestamp(); + // if the timestamp is not set, use the current timestamp + if (eventTimestamp == TimelineEvent.INVALID_TIMESTAMP) { + LOG.warn("timestamp is not set for event " + eventId + + "! Using the current timestamp"); + eventTimestamp = System.currentTimeMillis(); + } + Map<String, Object> eventInfo = event.getInfo(); + if ((eventInfo == null) || (eventInfo.size() == 0)) { + byte[] columnQualifierBytes = + new EventColumnName(eventId, eventTimestamp, null) + .getColumnQualifier(); + ColumnRWHelper.store(rowKey, table, columnPrefix, + columnQualifierBytes, null, Separator.EMPTY_BYTES); + } else { + for (Map.Entry<String, Object> info : eventInfo.entrySet()) { + // eventId=infoKey + byte[] columnQualifierBytes = + new EventColumnName(eventId, eventTimestamp, info.getKey()) + .getColumnQualifier(); + ColumnRWHelper.store(rowKey, table, columnPrefix, + columnQualifierBytes, null, info.getValue()); + } // for info: eventInfo + } + } + } + } // event : events + } + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage + * .TimelineWriter#aggregate + * (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity, + * org.apache + * .hadoop.yarn.server.timelineservice.storage.TimelineAggregationTrack) + */ + @Override + public TimelineWriteResponse aggregate(TimelineEntity data, + TimelineAggregationTrack track) throws IOException { + return null; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter#flush + * () + */ + @Override + public void flush() throws IOException { + // flush all buffered mutators + entityTable.flush(); + appToFlowTable.flush(); + applicationTable.flush(); + flowRunTable.flush(); + flowActivityTable.flush(); + subApplicationTable.flush(); + } + + /** + * close the hbase connections The close APIs perform flushing and release any + * resources held. + */ + @Override + protected void serviceStop() throws Exception { + if (entityTable != null) { + LOG.info("closing the entity table"); + // The close API performs flushing and releases any resources held + entityTable.close(); + } + if (appToFlowTable != null) { + LOG.info("closing the app_flow table"); + // The close API performs flushing and releases any resources held + appToFlowTable.close(); + } + if (applicationTable != null) { + LOG.info("closing the application table"); + applicationTable.close(); + } + if (flowRunTable != null) { + LOG.info("closing the flow run table"); + // The close API performs flushing and releases any resources held + flowRunTable.close(); + } + if (flowActivityTable != null) { + LOG.info("closing the flowActivityTable table"); + // The close API performs flushing and releases any resources held + flowActivityTable.close(); + } + if (subApplicationTable != null) { + subApplicationTable.close(); + } + if (conn != null) { + LOG.info("closing the hbase Connection"); + conn.close(); + } + super.serviceStop(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java new file mode 100644 index 0000000..e9e4770 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java @@ -0,0 +1,368 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTableRW; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This creates the schema for a hbase based backend for storing application + * timeline information. + */ [email protected] [email protected] +public final class TimelineSchemaCreator { + private TimelineSchemaCreator() { + } + + final static String NAME = TimelineSchemaCreator.class.getSimpleName(); + private static final Logger LOG = + LoggerFactory.getLogger(TimelineSchemaCreator.class); + private static final String SKIP_EXISTING_TABLE_OPTION_SHORT = "s"; + private static final String APP_METRICS_TTL_OPTION_SHORT = "ma"; + private static final String SUB_APP_METRICS_TTL_OPTION_SHORT = "msa"; + private static final String APP_TABLE_NAME_SHORT = "a"; + private static final String SUB_APP_TABLE_NAME_SHORT = "sa"; + private static final String APP_TO_FLOW_TABLE_NAME_SHORT = "a2f"; + private static final String ENTITY_METRICS_TTL_OPTION_SHORT = "me"; + private static final String ENTITY_TABLE_NAME_SHORT = "e"; + private static final String HELP_SHORT = "h"; + private static final String CREATE_TABLES_SHORT = "c"; + + public static void main(String[] args) throws Exception { + + LOG.info("Starting the schema creation"); + Configuration hbaseConf = + HBaseTimelineStorageUtils.getTimelineServiceHBaseConf( + new YarnConfiguration()); + // Grab input args and allow for -Dxyz style arguments + String[] otherArgs = new GenericOptionsParser(hbaseConf, args) + .getRemainingArgs(); + + // Grab the arguments we're looking for. + CommandLine commandLine = parseArgs(otherArgs); + + if (commandLine.hasOption(HELP_SHORT)) { + // -help option has the highest precedence + printUsage(); + } else if (commandLine.hasOption(CREATE_TABLES_SHORT)) { + // Grab the entityTableName argument + String entityTableName = commandLine.getOptionValue( + ENTITY_TABLE_NAME_SHORT); + if (StringUtils.isNotBlank(entityTableName)) { + hbaseConf.set(EntityTableRW.TABLE_NAME_CONF_NAME, entityTableName); + } + // Grab the entity metrics TTL + String entityTableMetricsTTL = commandLine.getOptionValue( + ENTITY_METRICS_TTL_OPTION_SHORT); + if (StringUtils.isNotBlank(entityTableMetricsTTL)) { + int entityMetricsTTL = Integer.parseInt(entityTableMetricsTTL); + new EntityTableRW().setMetricsTTL(entityMetricsTTL, hbaseConf); + } + // Grab the appToflowTableName argument + String appToflowTableName = commandLine.getOptionValue( + APP_TO_FLOW_TABLE_NAME_SHORT); + if (StringUtils.isNotBlank(appToflowTableName)) { + hbaseConf.set( + AppToFlowTableRW.TABLE_NAME_CONF_NAME, appToflowTableName); + } + // Grab the applicationTableName argument + String applicationTableName = commandLine.getOptionValue( + APP_TABLE_NAME_SHORT); + if (StringUtils.isNotBlank(applicationTableName)) { + hbaseConf.set(ApplicationTableRW.TABLE_NAME_CONF_NAME, + applicationTableName); + } + // Grab the application metrics TTL + String applicationTableMetricsTTL = commandLine.getOptionValue( + APP_METRICS_TTL_OPTION_SHORT); + if (StringUtils.isNotBlank(applicationTableMetricsTTL)) { + int appMetricsTTL = Integer.parseInt(applicationTableMetricsTTL); + new ApplicationTableRW().setMetricsTTL(appMetricsTTL, hbaseConf); + } + + // Grab the subApplicationTableName argument + String subApplicationTableName = commandLine.getOptionValue( + SUB_APP_TABLE_NAME_SHORT); + if (StringUtils.isNotBlank(subApplicationTableName)) { + hbaseConf.set(SubApplicationTableRW.TABLE_NAME_CONF_NAME, + subApplicationTableName); + } + // Grab the subApplication metrics TTL + String subApplicationTableMetricsTTL = commandLine + .getOptionValue(SUB_APP_METRICS_TTL_OPTION_SHORT); + if (StringUtils.isNotBlank(subApplicationTableMetricsTTL)) { + int subAppMetricsTTL = Integer.parseInt(subApplicationTableMetricsTTL); + new SubApplicationTableRW().setMetricsTTL(subAppMetricsTTL, hbaseConf); + } + + // create all table schemas in hbase + final boolean skipExisting = commandLine.hasOption( + SKIP_EXISTING_TABLE_OPTION_SHORT); + createAllSchemas(hbaseConf, skipExisting); + } else { + // print usage information if -create is not specified + printUsage(); + } + } + + /** + * Parse command-line arguments. + * + * @param args + * command line arguments passed to program. + * @return parsed command line. + * @throws ParseException + */ + private static CommandLine parseArgs(String[] args) throws ParseException { + Options options = new Options(); + + // Input + Option o = new Option(HELP_SHORT, "help", false, "print help information"); + o.setRequired(false); + options.addOption(o); + + o = new Option(CREATE_TABLES_SHORT, "create", false, + "a mandatory option to create hbase tables"); + o.setRequired(false); + options.addOption(o); + + o = new Option(ENTITY_TABLE_NAME_SHORT, "entityTableName", true, + "entity table name"); + o.setArgName("entityTableName"); + o.setRequired(false); + options.addOption(o); + + o = new Option(ENTITY_METRICS_TTL_OPTION_SHORT, "entityMetricsTTL", true, + "TTL for metrics column family"); + o.setArgName("entityMetricsTTL"); + o.setRequired(false); + options.addOption(o); + + o = new Option(APP_TO_FLOW_TABLE_NAME_SHORT, "appToflowTableName", true, + "app to flow table name"); + o.setArgName("appToflowTableName"); + o.setRequired(false); + options.addOption(o); + + o = new Option(APP_TABLE_NAME_SHORT, "applicationTableName", true, + "application table name"); + o.setArgName("applicationTableName"); + o.setRequired(false); + options.addOption(o); + + o = new Option(APP_METRICS_TTL_OPTION_SHORT, "applicationMetricsTTL", true, + "TTL for metrics column family"); + o.setArgName("applicationMetricsTTL"); + o.setRequired(false); + options.addOption(o); + + o = new Option(SUB_APP_TABLE_NAME_SHORT, "subApplicationTableName", true, + "subApplication table name"); + o.setArgName("subApplicationTableName"); + o.setRequired(false); + options.addOption(o); + + o = new Option(SUB_APP_METRICS_TTL_OPTION_SHORT, "subApplicationMetricsTTL", + true, "TTL for metrics column family"); + o.setArgName("subApplicationMetricsTTL"); + o.setRequired(false); + options.addOption(o); + + // Options without an argument + // No need to set arg name since we do not need an argument here + o = new Option(SKIP_EXISTING_TABLE_OPTION_SHORT, "skipExistingTable", + false, "skip existing Hbase tables and continue to create new tables"); + o.setRequired(false); + options.addOption(o); + + CommandLineParser parser = new PosixParser(); + CommandLine commandLine = null; + try { + commandLine = parser.parse(options, args); + } catch (Exception e) { + LOG.error("ERROR: " + e.getMessage() + "\n"); + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(NAME + " ", options, true); + System.exit(-1); + } + + return commandLine; + } + + private static void printUsage() { + StringBuilder usage = new StringBuilder("Command Usage: \n"); + usage.append("TimelineSchemaCreator [-help] Display help info" + + " for all commands. Or\n"); + usage.append("TimelineSchemaCreator -create [OPTIONAL_OPTIONS]" + + " Create hbase tables.\n\n"); + usage.append("The Optional options for creating tables include: \n"); + usage.append("[-entityTableName <Entity Table Name>] " + + "The name of the Entity table\n"); + usage.append("[-entityMetricsTTL <Entity Table Metrics TTL>]" + + " TTL for metrics in the Entity table\n"); + usage.append("[-appToflowTableName <AppToflow Table Name>]" + + " The name of the AppToFlow table\n"); + usage.append("[-applicationTableName <Application Table Name>]" + + " The name of the Application table\n"); + usage.append("[-applicationMetricsTTL <Application Table Metrics TTL>]" + + " TTL for metrics in the Application table\n"); + usage.append("[-subApplicationTableName <SubApplication Table Name>]" + + " The name of the SubApplication table\n"); + usage.append("[-subApplicationMetricsTTL " + + " <SubApplication Table Metrics TTL>]" + + " TTL for metrics in the SubApplication table\n"); + usage.append("[-skipExistingTable] Whether to skip existing" + + " hbase tables\n"); + System.out.println(usage.toString()); + } + + /** + * Create all table schemas and log success or exception if failed. + * @param hbaseConf the hbase configuration to create tables with + * @param skipExisting whether to skip existing hbase tables + */ + private static void createAllSchemas(Configuration hbaseConf, + boolean skipExisting) { + List<Exception> exceptions = new ArrayList<>(); + try { + if (skipExisting) { + LOG.info("Will skip existing tables and continue on htable creation " + + "exceptions!"); + } + createAllTables(hbaseConf, skipExisting); + LOG.info("Successfully created HBase schema. "); + } catch (IOException e) { + LOG.error("Error in creating hbase tables: ", e); + exceptions.add(e); + } + + if (exceptions.size() > 0) { + LOG.warn("Schema creation finished with the following exceptions"); + for (Exception e : exceptions) { + LOG.warn(e.getMessage()); + } + System.exit(-1); + } else { + LOG.info("Schema creation finished successfully"); + } + } + + @VisibleForTesting + public static void createAllTables(Configuration hbaseConf, + boolean skipExisting) throws IOException { + + Connection conn = null; + try { + conn = ConnectionFactory.createConnection(hbaseConf); + Admin admin = conn.getAdmin(); + if (admin == null) { + throw new IOException("Cannot create table since admin is null"); + } + try { + new EntityTableRW().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + try { + new AppToFlowTableRW().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + try { + new ApplicationTableRW().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + try { + new FlowRunTableRW().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + try { + new FlowActivityTableRW().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + try { + new SubApplicationTableRW().createTable(admin, hbaseConf); + } catch (IOException e) { + if (skipExisting) { + LOG.warn("Skip and continue on: " + e.getMessage()); + } else { + throw e; + } + } + } finally { + if (conn != null) { + conn.close(); + } + } + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTableRW.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTableRW.java new file mode 100644 index 0000000..808994e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTableRW.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Create, read and write to the Application Table. + */ +public class ApplicationTableRW extends BaseTableRW<ApplicationTable> { + /** application prefix. */ + private static final String PREFIX = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + "application"; + + /** config param name that specifies the application table name. */ + public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; + + /** + * config param name that specifies the TTL for metrics column family in + * application table. + */ + private static final String METRICS_TTL_CONF_NAME = PREFIX + + ".table.metrics.ttl"; + + /** + * config param name that specifies max-versions for metrics column family in + * entity table. + */ + private static final String METRICS_MAX_VERSIONS = + PREFIX + ".table.metrics.max-versions"; + + /** default value for application table name. */ + private static final String DEFAULT_TABLE_NAME = + "timelineservice.application"; + + /** default TTL is 30 days for metrics timeseries. */ + private static final int DEFAULT_METRICS_TTL = 2592000; + + /** default max number of versions. */ + private static final int DEFAULT_METRICS_MAX_VERSIONS = 10000; + + private static final Logger LOG = + LoggerFactory.getLogger(ApplicationTableRW.class); + + public ApplicationTableRW() { + super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTableRW# + * createTable(org.apache.hadoop.hbase.client.Admin, + * org.apache.hadoop.conf.Configuration) + */ + public void createTable(Admin admin, Configuration hbaseConf) + throws IOException { + + TableName table = getTableName(hbaseConf); + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + throw new IOException("Table " + table.getNameAsString() + + " already exists."); + } + + HTableDescriptor applicationTableDescp = new HTableDescriptor(table); + HColumnDescriptor infoCF = + new HColumnDescriptor(ApplicationColumnFamily.INFO.getBytes()); + infoCF.setBloomFilterType(BloomType.ROWCOL); + applicationTableDescp.addFamily(infoCF); + + HColumnDescriptor configCF = + new HColumnDescriptor(ApplicationColumnFamily.CONFIGS.getBytes()); + configCF.setBloomFilterType(BloomType.ROWCOL); + configCF.setBlockCacheEnabled(true); + applicationTableDescp.addFamily(configCF); + + HColumnDescriptor metricsCF = + new HColumnDescriptor(ApplicationColumnFamily.METRICS.getBytes()); + applicationTableDescp.addFamily(metricsCF); + metricsCF.setBlockCacheEnabled(true); + // always keep 1 version (the latest) + metricsCF.setMinVersions(1); + metricsCF.setMaxVersions( + hbaseConf.getInt(METRICS_MAX_VERSIONS, DEFAULT_METRICS_MAX_VERSIONS)); + metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME, + DEFAULT_METRICS_TTL)); + applicationTableDescp.setRegionSplitPolicyClassName( + "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy"); + applicationTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length", + TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH); + admin.createTable(applicationTableDescp, + TimelineHBaseSchemaConstants.getUsernameSplits()); + LOG.info("Status of table creation for " + table.getNameAsString() + "=" + + admin.tableExists(table)); + } + + /** + * @param metricsTTL time to live parameter for the metrics in this table. + * @param hbaseConf configuration in which to set the metrics TTL config + * variable. + */ + public void setMetricsTTL(int metricsTTL, Configuration hbaseConf) { + hbaseConf.setInt(METRICS_TTL_CONF_NAME, metricsTTL); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java new file mode 100644 index 0000000..03f508f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage.application + * contains classes related to implementation for application table. + */ [email protected] [email protected] +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTableRW.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTableRW.java new file mode 100644 index 0000000..6460203 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTableRW.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Create, read and write to the AppToFlow Table. + */ +public class AppToFlowTableRW extends BaseTableRW<AppToFlowTable> { + /** app_flow prefix. */ + private static final String PREFIX = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + "app-flow"; + + /** config param name that specifies the app_flow table name. */ + public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; + + /** default value for app_flow table name. */ + private static final String DEFAULT_TABLE_NAME = "timelineservice.app_flow"; + + private static final Logger LOG = + LoggerFactory.getLogger(AppToFlowTableRW.class); + + public AppToFlowTableRW() { + super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTableRW# + * createTable(org.apache.hadoop.hbase.client.Admin, + * org.apache.hadoop.conf.Configuration) + */ + public void createTable(Admin admin, Configuration hbaseConf) + throws IOException { + + TableName table = getTableName(hbaseConf); + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + throw new IOException("Table " + table.getNameAsString() + + " already exists."); + } + + HTableDescriptor appToFlowTableDescp = new HTableDescriptor(table); + HColumnDescriptor mappCF = + new HColumnDescriptor(AppToFlowColumnFamily.MAPPING.getBytes()); + mappCF.setBloomFilterType(BloomType.ROWCOL); + appToFlowTableDescp.addFamily(mappCF); + + appToFlowTableDescp + .setRegionSplitPolicyClassName( + "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy"); + appToFlowTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length", + TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH); + admin.createTable(appToFlowTableDescp, + TimelineHBaseSchemaConstants.getUsernameSplits()); + LOG.info("Status of table creation for " + table.getNameAsString() + "=" + + admin.tableExists(table)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java new file mode 100644 index 0000000..f01d982 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow + * contains classes related to implementation for app to flow table. + */ [email protected] [email protected] +package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTableRW.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTableRW.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTableRW.java new file mode 100644 index 0000000..12ebce4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTableRW.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * Implements behavior common to tables used in the timeline service storage. It + * is thread-safe, and can be used by multiple threads concurrently. + * + * @param <T> reference to the table instance class itself for type safety. + */ +public abstract class BaseTableRW<T extends BaseTable<T>> { + + /** + * Name of config variable that is used to point to this table. + */ + private final String tableNameConfName; + + /** + * Unless the configuration overrides, this will be the default name for the + * table when it is created. + */ + private final String defaultTableName; + + /** + * @param tableNameConfName name of config variable that is used to point to + * this table. + * @param defaultTableName Default table name if table from config is not + * found. + */ + protected BaseTableRW(String tableNameConfName, String defaultTableName) { + this.tableNameConfName = tableNameConfName; + this.defaultTableName = defaultTableName; + } + + /** + * Used to create a type-safe mutator for this table. + * + * @param hbaseConf used to read table name. + * @param conn used to create a table from. + * @return a type safe {@link BufferedMutator} for the entity table. + * @throws IOException if any exception occurs while creating mutator for the + * table. + */ + public TypedBufferedMutator<T> getTableMutator(Configuration hbaseConf, + Connection conn) throws IOException { + + TableName tableName = this.getTableName(hbaseConf); + + // Plain buffered mutator + BufferedMutator bufferedMutator = conn.getBufferedMutator(tableName); + + // Now make this thing type safe. + // This is how service initialization should hang on to this variable, with + // the proper type + TypedBufferedMutator<T> table = + new TypedBufferedMutator<T>(bufferedMutator); + + return table; + } + + /** + * @param hbaseConf used to read settings that override defaults + * @param conn used to create table from + * @param scan that specifies what you want to read from this table. + * @return scanner for the table. + * @throws IOException if any exception occurs while getting the scanner. + */ + public ResultScanner getResultScanner(Configuration hbaseConf, + Connection conn, Scan scan) throws IOException { + Table table = conn.getTable(getTableName(hbaseConf)); + return table.getScanner(scan); + } + + /** + * + * @param hbaseConf used to read settings that override defaults + * @param conn used to create table from + * @param get that specifies what single row you want to get from this table + * @return result of get operation + * @throws IOException if any exception occurs while getting the result. + */ + public Result getResult(Configuration hbaseConf, Connection conn, Get get) + throws IOException { + Table table = conn.getTable(getTableName(hbaseConf)); + return table.get(get); + } + + /** + * Get the table name for the input table. + * + * @param conf HBase configuration from which table name will be fetched. + * @param tableName name of the table to be fetched + * @return A {@link TableName} object. + */ + public static TableName getTableName(Configuration conf, String tableName) { + String tableSchemaPrefix = conf.get( + YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX); + return TableName.valueOf(tableSchemaPrefix + tableName); + } + + /** + * Get the table name for this table. + * + * @param conf HBase configuration from which table name will be fetched. + * @return A {@link TableName} object. + */ + public TableName getTableName(Configuration conf) { + String tableName = conf.get(tableNameConfName, defaultTableName); + return getTableName(conf, tableName); + } + + /** + * Get the table name based on the input config parameters. + * + * @param conf HBase configuration from which table name will be fetched. + * @param tableNameInConf the table name parameter in conf. + * @param defaultTableName the default table name. + * @return A {@link TableName} object. + */ + public static TableName getTableName(Configuration conf, + String tableNameInConf, String defaultTableName) { + String tableName = conf.get(tableNameInConf, defaultTableName); + return getTableName(conf, tableName); + } + + /** + * Used to create the table in HBase. Should be called only once (per HBase + * instance). + * + * @param admin Used for doing HBase table operations. + * @param hbaseConf Hbase configuration. + * @throws IOException if any exception occurs while creating the table. + */ + public abstract void createTable(Admin admin, Configuration hbaseConf) + throws IOException; + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
