http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java new file mode 100644 index 0000000..7d37206 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; + +import com.google.common.collect.Iterables; + +public class TestSeparator { + + private static String villain = "Dr. Heinz Doofenshmirtz"; + private static String special = + ". * | ? + \t ( ) [ ] { } ^ $ \\ \" %"; + + /** + * + */ + @Test + public void testEncodeDecodeString() { + + for (Separator separator : Separator.values()) { + testEncodeDecode(separator, ""); + testEncodeDecode(separator, " "); + testEncodeDecode(separator, "!"); + testEncodeDecode(separator, "?"); + testEncodeDecode(separator, "&"); + testEncodeDecode(separator, "+"); + testEncodeDecode(separator, "\t"); + testEncodeDecode(separator, "Dr."); + testEncodeDecode(separator, "Heinz"); + testEncodeDecode(separator, "Doofenshmirtz"); + testEncodeDecode(separator, villain); + testEncodeDecode(separator, special); + + assertNull(separator.encode(null)); + + } + } + + private void testEncodeDecode(Separator separator, String token) { + String encoded = separator.encode(token); + String decoded = separator.decode(encoded); + String msg = "token:" + token + " separator:" + separator + "."; + assertEquals(msg, token, decoded); + } + + @Test + public void testEncodeDecode() { + testEncodeDecode("Dr.", Separator.QUALIFIERS); + testEncodeDecode("Heinz", Separator.QUALIFIERS, Separator.QUALIFIERS); + testEncodeDecode("Doofenshmirtz", Separator.QUALIFIERS, null, + Separator.QUALIFIERS); + testEncodeDecode("&Perry", Separator.QUALIFIERS, Separator.VALUES, null); + testEncodeDecode("the ", Separator.QUALIFIERS, Separator.SPACE); + testEncodeDecode("Platypus...", (Separator) null); + testEncodeDecode("The what now ?!?", Separator.QUALIFIERS, + Separator.VALUES, Separator.SPACE); + + } + @Test + public void testEncodedValues() { + testEncodeDecode("Double-escape %2$ and %9$ or %%2$ or %%3$, nor %%%2$" + + "= no problem!", + Separator.QUALIFIERS, Separator.VALUES, Separator.SPACE, Separator.TAB); + } + + @Test + public void testSplits() { + byte[] maxLongBytes = Bytes.toBytes(Long.MAX_VALUE); + byte[] maxIntBytes = Bytes.toBytes(Integer.MAX_VALUE); + for (Separator separator : Separator.values()) { + String str1 = "cl" + separator.getValue() + "us"; + String str2 = separator.getValue() + "rst"; + byte[] sepByteArr = Bytes.toBytes(separator.getValue()); + byte[] longVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxLongBytes, + sepByteArr.length, Bytes.SIZEOF_LONG - sepByteArr.length)); + byte[] intVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxIntBytes, + sepByteArr.length, Bytes.SIZEOF_INT - sepByteArr.length)); + byte[] arr = separator.join( + Bytes.toBytes(separator.encode(str1)), longVal1Arr, + Bytes.toBytes(separator.encode(str2)), intVal1Arr); + int[] sizes = {Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, + Separator.VARIABLE_SIZE, Bytes.SIZEOF_INT}; + byte[][] splits = separator.split(arr, sizes); + assertEquals(4, splits.length); + assertEquals(str1, separator.decode(Bytes.toString(splits[0]))); + assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1])); + assertEquals(str2, separator.decode(Bytes.toString(splits[2]))); + assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3])); + + longVal1Arr = Bytes.add(Bytes.copy(maxLongBytes, 0, Bytes.SIZEOF_LONG - + sepByteArr.length), sepByteArr); + intVal1Arr = Bytes.add(Bytes.copy(maxIntBytes, 0, Bytes.SIZEOF_INT - + sepByteArr.length), sepByteArr); + arr = separator.join(Bytes.toBytes(separator.encode(str1)), longVal1Arr, + Bytes.toBytes(separator.encode(str2)), intVal1Arr); + splits = separator.split(arr, sizes); + assertEquals(4, splits.length); + assertEquals(str1, separator.decode(Bytes.toString(splits[0]))); + assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1])); + assertEquals(str2, separator.decode(Bytes.toString(splits[2]))); + assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3])); + + longVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxLongBytes, + sepByteArr.length, 4 - sepByteArr.length), sepByteArr); + longVal1Arr = Bytes.add(longVal1Arr, Bytes.copy(maxLongBytes, 4, 3 - + sepByteArr.length), sepByteArr); + arr = separator.join(Bytes.toBytes(separator.encode(str1)), longVal1Arr, + Bytes.toBytes(separator.encode(str2)), intVal1Arr); + splits = separator.split(arr, sizes); + assertEquals(4, splits.length); + assertEquals(str1, separator.decode(Bytes.toString(splits[0]))); + assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1])); + assertEquals(str2, separator.decode(Bytes.toString(splits[2]))); + assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3])); + + arr = separator.join(Bytes.toBytes(separator.encode(str1)), + Bytes.toBytes(separator.encode(str2)), intVal1Arr, longVal1Arr); + int[] sizes1 = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Bytes.SIZEOF_INT, Bytes.SIZEOF_LONG}; + splits = separator.split(arr, sizes1); + assertEquals(4, splits.length); + assertEquals(str1, separator.decode(Bytes.toString(splits[0]))); + assertEquals(str2, separator.decode(Bytes.toString(splits[1]))); + assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[2])); + assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[3])); + + try { + int[] sizes2 = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Bytes.SIZEOF_INT, 7}; + splits = separator.split(arr, sizes2); + fail("Exception should have been thrown."); + } catch (IllegalArgumentException e) {} + + try { + int[] sizes2 = {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, 2, + Bytes.SIZEOF_LONG}; + splits = separator.split(arr, sizes2); + fail("Exception should have been thrown."); + } catch (IllegalArgumentException e) {} + } + } + + /** + * Simple test to encode and decode using the same separators and confirm that + * we end up with the same as what we started with. + * + * @param token + * @param separators + */ + private static void testEncodeDecode(String token, Separator... separators) { + byte[] encoded = Separator.encode(token, separators); + String decoded = Separator.decode(encoded, separators); + assertEquals(token, decoded); + } + + @Test + public void testJoinStripped() { + List<String> stringList = new ArrayList<String>(0); + stringList.add("nothing"); + + String joined = Separator.VALUES.joinEncoded(stringList); + Iterable<String> split = Separator.VALUES.splitEncoded(joined); + assertTrue(Iterables.elementsEqual(stringList, split)); + + stringList = new ArrayList<String>(3); + stringList.add("a"); + stringList.add("b?"); + stringList.add("c"); + + joined = Separator.VALUES.joinEncoded(stringList); + split = Separator.VALUES.splitEncoded(joined); + assertTrue(Iterables.elementsEqual(stringList, split)); + + String[] stringArray1 = {"else"}; + joined = Separator.VALUES.joinEncoded(stringArray1); + split = Separator.VALUES.splitEncoded(joined); + assertTrue(Iterables.elementsEqual(Arrays.asList(stringArray1), split)); + + String[] stringArray2 = {"d", "e?", "f"}; + joined = Separator.VALUES.joinEncoded(stringArray2); + split = Separator.VALUES.splitEncoded(joined); + assertTrue(Iterables.elementsEqual(Arrays.asList(stringArray2), split)); + + List<String> empty = new ArrayList<String>(0); + split = Separator.VALUES.splitEncoded(null); + assertTrue(Iterables.elementsEqual(empty, split)); + + } + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml index 7a5e38a..e0e7659 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml @@ -97,11 +97,6 @@ </dependency> <dependency> - <groupId>commons-cli</groupId> - <artifactId>commons-cli</artifactId> - </dependency> - - <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> </dependency> @@ -132,51 +127,6 @@ <version>1.1.1</version> </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-common</artifactId> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-client</artifactId> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-server</artifactId> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs-client</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - </exclusion> - </exclusions> - </dependency> - <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <dependency> <groupId>org.apache.hadoop</groupId> http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java deleted file mode 100644 index 8e38e95..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java +++ /dev/null @@ -1,307 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.reader.filter; - -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.filter.BinaryComparator; -import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; -import org.apache.hadoop.hbase.filter.FamilyFilter; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.FilterList.Operator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; -import org.apache.hadoop.hbase.filter.QualifierFilter; -import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; - -/** - * Set of utility methods used by timeline filter classes. - */ -public final class TimelineFilterUtils { - - private static final Log LOG = LogFactory.getLog(TimelineFilterUtils.class); - - private TimelineFilterUtils() { - } - - /** - * Returns the equivalent HBase filter list's {@link Operator}. - * - * @param op timeline filter list operator. - * @return HBase filter list's Operator. - */ - private static Operator getHBaseOperator(TimelineFilterList.Operator op) { - switch (op) { - case AND: - return Operator.MUST_PASS_ALL; - case OR: - return Operator.MUST_PASS_ONE; - default: - throw new IllegalArgumentException("Invalid operator"); - } - } - - /** - * Returns the equivalent HBase compare filter's {@link CompareOp}. - * - * @param op timeline compare op. - * @return HBase compare filter's CompareOp. - */ - private static CompareOp getHBaseCompareOp( - TimelineCompareOp op) { - switch (op) { - case LESS_THAN: - return CompareOp.LESS; - case LESS_OR_EQUAL: - return CompareOp.LESS_OR_EQUAL; - case EQUAL: - return CompareOp.EQUAL; - case NOT_EQUAL: - return CompareOp.NOT_EQUAL; - case GREATER_OR_EQUAL: - return CompareOp.GREATER_OR_EQUAL; - case GREATER_THAN: - return CompareOp.GREATER; - default: - throw new IllegalArgumentException("Invalid compare operator"); - } - } - - /** - * Converts a {@link TimelinePrefixFilter} to an equivalent HBase - * {@link QualifierFilter}. - * @param colPrefix - * @param filter - * @return a {@link QualifierFilter} object - */ - private static <T> Filter createHBaseColQualPrefixFilter( - ColumnPrefix<T> colPrefix, TimelinePrefixFilter filter) { - return new QualifierFilter(getHBaseCompareOp(filter.getCompareOp()), - new BinaryPrefixComparator( - colPrefix.getColumnPrefixBytes(filter.getPrefix()))); - } - - /** - * Create a HBase {@link QualifierFilter} for the passed column prefix and - * compare op. - * - * @param <T> Describes the type of column prefix. - * @param compareOp compare op. - * @param columnPrefix column prefix. - * @return a column qualifier filter. - */ - public static <T> Filter createHBaseQualifierFilter(CompareOp compareOp, - ColumnPrefix<T> columnPrefix) { - return new QualifierFilter(compareOp, - new BinaryPrefixComparator( - columnPrefix.getColumnPrefixBytes(""))); - } - - /** - * Create filters for confs or metrics to retrieve. This list includes a - * configs/metrics family filter and relevant filters for confs/metrics to - * retrieve, if present. - * - * @param <T> Describes the type of column prefix. - * @param confsOrMetricToRetrieve configs/metrics to retrieve. - * @param columnFamily config or metric column family. - * @param columnPrefix config or metric column prefix. - * @return a filter list. - * @throws IOException if any problem occurs while creating the filters. - */ - public static <T> Filter createFilterForConfsOrMetricsToRetrieve( - TimelineFilterList confsOrMetricToRetrieve, ColumnFamily<T> columnFamily, - ColumnPrefix<T> columnPrefix) throws IOException { - Filter familyFilter = new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(columnFamily.getBytes())); - if (confsOrMetricToRetrieve != null && - !confsOrMetricToRetrieve.getFilterList().isEmpty()) { - // If confsOrMetricsToRetrive are specified, create a filter list based - // on it and family filter. - FilterList filter = new FilterList(familyFilter); - filter.addFilter( - createHBaseFilterList(columnPrefix, confsOrMetricToRetrieve)); - return filter; - } else { - // Only the family filter needs to be added. - return familyFilter; - } - } - - /** - * Create 2 HBase {@link SingleColumnValueFilter} filters for the specified - * value range represented by start and end value and wraps them inside a - * filter list. Start and end value should not be null. - * - * @param <T> Describes the type of column prefix. - * @param column Column for which single column value filter is to be created. - * @param startValue Start value. - * @param endValue End value. - * @return 2 single column value filters wrapped in a filter list. - * @throws IOException if any problem is encountered while encoding value. - */ - public static <T> FilterList createSingleColValueFiltersByRange( - Column<T> column, Object startValue, Object endValue) throws IOException { - FilterList list = new FilterList(); - Filter singleColValFilterStart = createHBaseSingleColValueFilter( - column.getColumnFamilyBytes(), column.getColumnQualifierBytes(), - column.getValueConverter().encodeValue(startValue), - CompareOp.GREATER_OR_EQUAL, true); - list.addFilter(singleColValFilterStart); - - Filter singleColValFilterEnd = createHBaseSingleColValueFilter( - column.getColumnFamilyBytes(), column.getColumnQualifierBytes(), - column.getValueConverter().encodeValue(endValue), - CompareOp.LESS_OR_EQUAL, true); - list.addFilter(singleColValFilterEnd); - return list; - } - - /** - * Creates a HBase {@link SingleColumnValueFilter} with specified column. - * @param <T> Describes the type of column prefix. - * @param column Column which value to be filtered. - * @param value Value to be filtered. - * @param op Compare operator - * @return a SingleColumnValue Filter - * @throws IOException if any exception. - */ - public static <T> Filter createHBaseSingleColValueFilter(Column<T> column, - Object value, CompareOp op) throws IOException { - Filter singleColValFilter = createHBaseSingleColValueFilter( - column.getColumnFamilyBytes(), column.getColumnQualifierBytes(), - column.getValueConverter().encodeValue(value), op, true); - return singleColValFilter; - } - - /** - * Creates a HBase {@link SingleColumnValueFilter}. - * - * @param columnFamily Column Family represented as bytes. - * @param columnQualifier Column Qualifier represented as bytes. - * @param value Value. - * @param compareOp Compare operator. - * @param filterIfMissing This flag decides if we should filter the row if the - * specified column is missing. This is based on the filter's keyMustExist - * field. - * @return a {@link SingleColumnValueFilter} object - * @throws IOException - */ - private static SingleColumnValueFilter createHBaseSingleColValueFilter( - byte[] columnFamily, byte[] columnQualifier, byte[] value, - CompareOp compareOp, boolean filterIfMissing) throws IOException { - SingleColumnValueFilter singleColValFilter = - new SingleColumnValueFilter(columnFamily, columnQualifier, compareOp, - new BinaryComparator(value)); - singleColValFilter.setLatestVersionOnly(true); - singleColValFilter.setFilterIfMissing(filterIfMissing); - return singleColValFilter; - } - - /** - * Fetch columns from filter list containing exists and multivalue equality - * filters. This is done to fetch only required columns from back-end and - * then match event filters or relationships in reader. - * - * @param filterList filter list. - * @return set of columns. - */ - public static Set<String> fetchColumnsFromFilterList( - TimelineFilterList filterList) { - Set<String> strSet = new HashSet<String>(); - for (TimelineFilter filter : filterList.getFilterList()) { - switch(filter.getFilterType()) { - case LIST: - strSet.addAll(fetchColumnsFromFilterList((TimelineFilterList)filter)); - break; - case KEY_VALUES: - strSet.add(((TimelineKeyValuesFilter)filter).getKey()); - break; - case EXISTS: - strSet.add(((TimelineExistsFilter)filter).getValue()); - break; - default: - LOG.info("Unexpected filter type " + filter.getFilterType()); - break; - } - } - return strSet; - } - - /** - * Creates equivalent HBase {@link FilterList} from {@link TimelineFilterList} - * while converting different timeline filters(of type {@link TimelineFilter}) - * into their equivalent HBase filters. - * - * @param <T> Describes the type of column prefix. - * @param colPrefix column prefix which will be used for conversion. - * @param filterList timeline filter list which has to be converted. - * @return A {@link FilterList} object. - * @throws IOException if any problem occurs while creating the filter list. - */ - public static <T> FilterList createHBaseFilterList(ColumnPrefix<T> colPrefix, - TimelineFilterList filterList) throws IOException { - FilterList list = - new FilterList(getHBaseOperator(filterList.getOperator())); - for (TimelineFilter filter : filterList.getFilterList()) { - switch(filter.getFilterType()) { - case LIST: - list.addFilter(createHBaseFilterList(colPrefix, - (TimelineFilterList)filter)); - break; - case PREFIX: - list.addFilter(createHBaseColQualPrefixFilter(colPrefix, - (TimelinePrefixFilter)filter)); - break; - case COMPARE: - TimelineCompareFilter compareFilter = (TimelineCompareFilter)filter; - list.addFilter( - createHBaseSingleColValueFilter( - colPrefix.getColumnFamilyBytes(), - colPrefix.getColumnPrefixBytes(compareFilter.getKey()), - colPrefix.getValueConverter(). - encodeValue(compareFilter.getValue()), - getHBaseCompareOp(compareFilter.getCompareOp()), - compareFilter.getKeyMustExist())); - break; - case KEY_VALUE: - TimelineKeyValueFilter kvFilter = (TimelineKeyValueFilter)filter; - list.addFilter( - createHBaseSingleColValueFilter( - colPrefix.getColumnFamilyBytes(), - colPrefix.getColumnPrefixBytes(kvFilter.getKey()), - colPrefix.getValueConverter().encodeValue(kvFilter.getValue()), - getHBaseCompareOp(kvFilter.getCompareOp()), - kvFilter.getKeyMustExist())); - break; - default: - LOG.info("Unexpected filter type " + filter.getFilterType()); - break; - } - } - return list; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java deleted file mode 100644 index ce20113..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - - -import java.io.IOException; -import java.util.Set; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.reader.EntityTypeReader; -import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader; -import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory; - -/** - * HBase based implementation for {@link TimelineReader}. - */ -public class HBaseTimelineReaderImpl - extends AbstractService implements TimelineReader { - - private static final Log LOG = LogFactory - .getLog(HBaseTimelineReaderImpl.class); - - private Configuration hbaseConf = null; - private Connection conn; - - public HBaseTimelineReaderImpl() { - super(HBaseTimelineReaderImpl.class.getName()); - } - - @Override - public void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); - hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf); - conn = ConnectionFactory.createConnection(hbaseConf); - } - - @Override - protected void serviceStop() throws Exception { - if (conn != null) { - LOG.info("closing the hbase Connection"); - conn.close(); - } - super.serviceStop(); - } - - @Override - public TimelineEntity getEntity(TimelineReaderContext context, - TimelineDataToRetrieve dataToRetrieve) throws IOException { - TimelineEntityReader reader = - TimelineEntityReaderFactory.createSingleEntityReader(context, - dataToRetrieve); - return reader.readEntity(hbaseConf, conn); - } - - @Override - public Set<TimelineEntity> getEntities(TimelineReaderContext context, - TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) - throws IOException { - TimelineEntityReader reader = - TimelineEntityReaderFactory.createMultipleEntitiesReader(context, - filters, dataToRetrieve); - return reader.readEntities(hbaseConf, conn); - } - - @Override - public Set<String> getEntityTypes(TimelineReaderContext context) - throws IOException { - EntityTypeReader reader = new EntityTypeReader(context); - return reader.readEntityTypes(hbaseConf, conn); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java deleted file mode 100644 index dfd63bf..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ /dev/null @@ -1,542 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; -import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; - -/** - * This implements a hbase based backend for storing the timeline entity - * information. - * It writes to multiple tables at the backend - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class HBaseTimelineWriterImpl extends AbstractService implements - TimelineWriter { - - private static final Log LOG = LogFactory - .getLog(HBaseTimelineWriterImpl.class); - - private Connection conn; - private TypedBufferedMutator<EntityTable> entityTable; - private TypedBufferedMutator<AppToFlowTable> appToFlowTable; - private TypedBufferedMutator<ApplicationTable> applicationTable; - private TypedBufferedMutator<FlowActivityTable> flowActivityTable; - private TypedBufferedMutator<FlowRunTable> flowRunTable; - - /** - * Used to convert strings key components to and from storage format. - */ - private final KeyConverter<String> stringKeyConverter = - new StringKeyConverter(); - - /** - * Used to convert Long key components to and from storage format. - */ - private final KeyConverter<Long> longKeyConverter = new LongKeyConverter(); - - public HBaseTimelineWriterImpl() { - super(HBaseTimelineWriterImpl.class.getName()); - } - - /** - * initializes the hbase connection to write to the entity table. - */ - @Override - protected void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); - Configuration hbaseConf = - HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf); - conn = ConnectionFactory.createConnection(hbaseConf); - entityTable = new EntityTable().getTableMutator(hbaseConf, conn); - appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn); - applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn); - flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn); - flowActivityTable = - new FlowActivityTable().getTableMutator(hbaseConf, conn); - } - - /** - * Stores the entire information in TimelineEntities to the timeline store. - */ - @Override - public TimelineWriteResponse write(String clusterId, String userId, - String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntities data) throws IOException { - - TimelineWriteResponse putStatus = new TimelineWriteResponse(); - // defensive coding to avoid NPE during row key construction - if ((flowName == null) || (appId == null) || (clusterId == null) - || (userId == null)) { - LOG.warn("Found null for one of: flowName=" + flowName + " appId=" + appId - + " userId=" + userId + " clusterId=" + clusterId - + " . Not proceeding with writing to hbase"); - return putStatus; - } - - for (TimelineEntity te : data.getEntities()) { - - // a set can have at most 1 null - if (te == null) { - continue; - } - - // if the entity is the application, the destination is the application - // table - boolean isApplication = ApplicationEntity.isApplicationEntity(te); - byte[] rowKey; - if (isApplication) { - ApplicationRowKey applicationRowKey = - new ApplicationRowKey(clusterId, userId, flowName, flowRunId, - appId); - rowKey = applicationRowKey.getRowKey(); - } else { - EntityRowKey entityRowKey = - new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, - te.getType(), te.getIdPrefix(), te.getId()); - rowKey = entityRowKey.getRowKey(); - } - - storeInfo(rowKey, te, flowVersion, isApplication); - storeEvents(rowKey, te.getEvents(), isApplication); - storeConfig(rowKey, te.getConfigs(), isApplication); - storeMetrics(rowKey, te.getMetrics(), isApplication); - storeRelations(rowKey, te, isApplication); - - if (isApplication) { - TimelineEvent event = - ApplicationEntity.getApplicationEvent(te, - ApplicationMetricsConstants.CREATED_EVENT_TYPE); - FlowRunRowKey flowRunRowKey = - new FlowRunRowKey(clusterId, userId, flowName, flowRunId); - if (event != null) { - onApplicationCreated(flowRunRowKey, clusterId, appId, userId, - flowVersion, te, event.getTimestamp()); - } - // if it's an application entity, store metrics - storeFlowMetricsAppRunning(flowRunRowKey, appId, te); - // if application has finished, store it's finish time and write final - // values of all metrics - event = ApplicationEntity.getApplicationEvent(te, - ApplicationMetricsConstants.FINISHED_EVENT_TYPE); - if (event != null) { - onApplicationFinished(flowRunRowKey, flowVersion, appId, te, - event.getTimestamp()); - } - } - } - return putStatus; - } - - private void onApplicationCreated(FlowRunRowKey flowRunRowKey, - String clusterId, String appId, String userId, String flowVersion, - TimelineEntity te, long appCreatedTimeStamp) - throws IOException { - - String flowName = flowRunRowKey.getFlowName(); - Long flowRunId = flowRunRowKey.getFlowRunId(); - - // store in App to flow table - AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(appId); - byte[] rowKey = appToFlowRowKey.getRowKey(); - AppToFlowColumnPrefix.FLOW_NAME.store(rowKey, appToFlowTable, clusterId, - null, flowName); - AppToFlowColumnPrefix.FLOW_RUN_ID.store(rowKey, appToFlowTable, clusterId, - null, flowRunId); - AppToFlowColumnPrefix.USER_ID.store(rowKey, appToFlowTable, clusterId, null, - userId); - - // store in flow run table - storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te); - - // store in flow activity table - byte[] flowActivityRowKeyBytes = - new FlowActivityRowKey(flowRunRowKey.getClusterId(), - appCreatedTimeStamp, flowRunRowKey.getUserId(), flowName) - .getRowKey(); - byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId()); - FlowActivityColumnPrefix.RUN_ID.store(flowActivityRowKeyBytes, - flowActivityTable, qualifier, null, flowVersion, - AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); - } - - /* - * updates the {@link FlowRunTable} with Application Created information - */ - private void storeAppCreatedInFlowRunTable(FlowRunRowKey flowRunRowKey, - String appId, TimelineEntity te) throws IOException { - byte[] rowKey = flowRunRowKey.getRowKey(); - FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null, - te.getCreatedTime(), - AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); - } - - - /* - * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an - * application has finished - */ - private void onApplicationFinished(FlowRunRowKey flowRunRowKey, - String flowVersion, String appId, TimelineEntity te, - long appFinishedTimeStamp) throws IOException { - // store in flow run table - storeAppFinishedInFlowRunTable(flowRunRowKey, appId, te, - appFinishedTimeStamp); - - // indicate in the flow activity table that the app has finished - byte[] rowKey = - new FlowActivityRowKey(flowRunRowKey.getClusterId(), - appFinishedTimeStamp, flowRunRowKey.getUserId(), - flowRunRowKey.getFlowName()).getRowKey(); - byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId()); - FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier, - null, flowVersion, - AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); - } - - /* - * Update the {@link FlowRunTable} with Application Finished information - */ - private void storeAppFinishedInFlowRunTable(FlowRunRowKey flowRunRowKey, - String appId, TimelineEntity te, long appFinishedTimeStamp) - throws IOException { - byte[] rowKey = flowRunRowKey.getRowKey(); - Attribute attributeAppId = - AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId); - FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null, - appFinishedTimeStamp, attributeAppId); - - // store the final value of metrics since application has finished - Set<TimelineMetric> metrics = te.getMetrics(); - if (metrics != null) { - storeFlowMetrics(rowKey, metrics, attributeAppId, - AggregationOperation.SUM_FINAL.getAttribute()); - } - } - - /* - * Updates the {@link FlowRunTable} with Application Metrics - */ - private void storeFlowMetricsAppRunning(FlowRunRowKey flowRunRowKey, - String appId, TimelineEntity te) throws IOException { - Set<TimelineMetric> metrics = te.getMetrics(); - if (metrics != null) { - byte[] rowKey = flowRunRowKey.getRowKey(); - storeFlowMetrics(rowKey, metrics, - AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId), - AggregationOperation.SUM.getAttribute()); - } - } - - private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics, - Attribute... attributes) throws IOException { - for (TimelineMetric metric : metrics) { - byte[] metricColumnQualifier = stringKeyConverter.encode(metric.getId()); - Map<Long, Number> timeseries = metric.getValues(); - for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) { - Long timestamp = timeseriesEntry.getKey(); - FlowRunColumnPrefix.METRIC.store(rowKey, flowRunTable, - metricColumnQualifier, timestamp, timeseriesEntry.getValue(), - attributes); - } - } - } - - private void storeRelations(byte[] rowKey, TimelineEntity te, - boolean isApplication) throws IOException { - if (isApplication) { - storeRelations(rowKey, te.getIsRelatedToEntities(), - ApplicationColumnPrefix.IS_RELATED_TO, applicationTable); - storeRelations(rowKey, te.getRelatesToEntities(), - ApplicationColumnPrefix.RELATES_TO, applicationTable); - } else { - storeRelations(rowKey, te.getIsRelatedToEntities(), - EntityColumnPrefix.IS_RELATED_TO, entityTable); - storeRelations(rowKey, te.getRelatesToEntities(), - EntityColumnPrefix.RELATES_TO, entityTable); - } - } - - /** - * Stores the Relations from the {@linkplain TimelineEntity} object. - */ - private <T> void storeRelations(byte[] rowKey, - Map<String, Set<String>> connectedEntities, - ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table) - throws IOException { - for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities - .entrySet()) { - // id3?id4?id5 - String compoundValue = - Separator.VALUES.joinEncoded(connectedEntity.getValue()); - columnPrefix.store(rowKey, table, - stringKeyConverter.encode(connectedEntity.getKey()), null, - compoundValue); - } - } - - /** - * Stores information from the {@linkplain TimelineEntity} object. - */ - private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion, - boolean isApplication) throws IOException { - - if (isApplication) { - ApplicationColumn.ID.store(rowKey, applicationTable, null, te.getId()); - ApplicationColumn.CREATED_TIME.store(rowKey, applicationTable, null, - te.getCreatedTime()); - ApplicationColumn.FLOW_VERSION.store(rowKey, applicationTable, null, - flowVersion); - Map<String, Object> info = te.getInfo(); - if (info != null) { - for (Map.Entry<String, Object> entry : info.entrySet()) { - ApplicationColumnPrefix.INFO.store(rowKey, applicationTable, - stringKeyConverter.encode(entry.getKey()), null, - entry.getValue()); - } - } - } else { - EntityColumn.ID.store(rowKey, entityTable, null, te.getId()); - EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType()); - EntityColumn.CREATED_TIME.store(rowKey, entityTable, null, - te.getCreatedTime()); - EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion); - Map<String, Object> info = te.getInfo(); - if (info != null) { - for (Map.Entry<String, Object> entry : info.entrySet()) { - EntityColumnPrefix.INFO.store(rowKey, entityTable, - stringKeyConverter.encode(entry.getKey()), null, - entry.getValue()); - } - } - } - } - - /** - * stores the config information from {@linkplain TimelineEntity}. - */ - private void storeConfig(byte[] rowKey, Map<String, String> config, - boolean isApplication) throws IOException { - if (config == null) { - return; - } - for (Map.Entry<String, String> entry : config.entrySet()) { - byte[] configKey = stringKeyConverter.encode(entry.getKey()); - if (isApplication) { - ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable, - configKey, null, entry.getValue()); - } else { - EntityColumnPrefix.CONFIG.store(rowKey, entityTable, configKey, - null, entry.getValue()); - } - } - } - - /** - * stores the {@linkplain TimelineMetric} information from the - * {@linkplain TimelineEvent} object. - */ - private void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics, - boolean isApplication) throws IOException { - if (metrics != null) { - for (TimelineMetric metric : metrics) { - byte[] metricColumnQualifier = - stringKeyConverter.encode(metric.getId()); - Map<Long, Number> timeseries = metric.getValues(); - for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) { - Long timestamp = timeseriesEntry.getKey(); - if (isApplication) { - ApplicationColumnPrefix.METRIC.store(rowKey, applicationTable, - metricColumnQualifier, timestamp, timeseriesEntry.getValue()); - } else { - EntityColumnPrefix.METRIC.store(rowKey, entityTable, - metricColumnQualifier, timestamp, timeseriesEntry.getValue()); - } - } - } - } - } - - /** - * Stores the events from the {@linkplain TimelineEvent} object. - */ - private void storeEvents(byte[] rowKey, Set<TimelineEvent> events, - boolean isApplication) throws IOException { - if (events != null) { - for (TimelineEvent event : events) { - if (event != null) { - String eventId = event.getId(); - if (eventId != null) { - long eventTimestamp = event.getTimestamp(); - // if the timestamp is not set, use the current timestamp - if (eventTimestamp == TimelineEvent.INVALID_TIMESTAMP) { - LOG.warn("timestamp is not set for event " + eventId + - "! Using the current timestamp"); - eventTimestamp = System.currentTimeMillis(); - } - Map<String, Object> eventInfo = event.getInfo(); - if ((eventInfo == null) || (eventInfo.size() == 0)) { - byte[] columnQualifierBytes = - new EventColumnName(eventId, eventTimestamp, null) - .getColumnQualifier(); - if (isApplication) { - ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, - columnQualifierBytes, null, Separator.EMPTY_BYTES); - } else { - EntityColumnPrefix.EVENT.store(rowKey, entityTable, - columnQualifierBytes, null, Separator.EMPTY_BYTES); - } - } else { - for (Map.Entry<String, Object> info : eventInfo.entrySet()) { - // eventId=infoKey - byte[] columnQualifierBytes = - new EventColumnName(eventId, eventTimestamp, info.getKey()) - .getColumnQualifier(); - if (isApplication) { - ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, - columnQualifierBytes, null, info.getValue()); - } else { - EntityColumnPrefix.EVENT.store(rowKey, entityTable, - columnQualifierBytes, null, info.getValue()); - } - } // for info: eventInfo - } - } - } - } // event : events - } - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage - * .TimelineWriter#aggregate - * (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity, - * org.apache - * .hadoop.yarn.server.timelineservice.storage.TimelineAggregationTrack) - */ - @Override - public TimelineWriteResponse aggregate(TimelineEntity data, - TimelineAggregationTrack track) throws IOException { - return null; - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter#flush - * () - */ - @Override - public void flush() throws IOException { - // flush all buffered mutators - entityTable.flush(); - appToFlowTable.flush(); - applicationTable.flush(); - flowRunTable.flush(); - flowActivityTable.flush(); - } - - /** - * close the hbase connections The close APIs perform flushing and release any - * resources held. - */ - @Override - protected void serviceStop() throws Exception { - if (entityTable != null) { - LOG.info("closing the entity table"); - // The close API performs flushing and releases any resources held - entityTable.close(); - } - if (appToFlowTable != null) { - LOG.info("closing the app_flow table"); - // The close API performs flushing and releases any resources held - appToFlowTable.close(); - } - if (applicationTable != null) { - LOG.info("closing the application table"); - applicationTable.close(); - } - if (flowRunTable != null) { - LOG.info("closing the flow run table"); - // The close API performs flushing and releases any resources held - flowRunTable.close(); - } - if (flowActivityTable != null) { - LOG.info("closing the flowActivityTable table"); - // The close API performs flushing and releases any resources held - flowActivityTable.close(); - } - if (conn != null) { - LOG.info("closing the hbase Connection"); - conn.close(); - } - super.serviceStop(); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java deleted file mode 100644 index dd87169..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java +++ /dev/null @@ -1,251 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.commons.cli.PosixParser; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.util.GenericOptionsParser; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; - -import com.google.common.annotations.VisibleForTesting; - -/** - * This creates the schema for a hbase based backend for storing application - * timeline information. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public final class TimelineSchemaCreator { - private TimelineSchemaCreator() { - } - - final static String NAME = TimelineSchemaCreator.class.getSimpleName(); - private static final Log LOG = LogFactory.getLog(TimelineSchemaCreator.class); - private static final String SKIP_EXISTING_TABLE_OPTION_SHORT = "s"; - private static final String APP_TABLE_NAME_SHORT = "a"; - private static final String APP_TO_FLOW_TABLE_NAME_SHORT = "a2f"; - private static final String TTL_OPTION_SHORT = "m"; - private static final String ENTITY_TABLE_NAME_SHORT = "e"; - - public static void main(String[] args) throws Exception { - - Configuration hbaseConf = - HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(null); - // Grab input args and allow for -Dxyz style arguments - String[] otherArgs = new GenericOptionsParser(hbaseConf, args) - .getRemainingArgs(); - - // Grab the arguments we're looking for. - CommandLine commandLine = parseArgs(otherArgs); - - // Grab the entityTableName argument - String entityTableName - = commandLine.getOptionValue(ENTITY_TABLE_NAME_SHORT); - if (StringUtils.isNotBlank(entityTableName)) { - hbaseConf.set(EntityTable.TABLE_NAME_CONF_NAME, entityTableName); - } - String entityTableTTLMetrics = commandLine.getOptionValue(TTL_OPTION_SHORT); - if (StringUtils.isNotBlank(entityTableTTLMetrics)) { - int metricsTTL = Integer.parseInt(entityTableTTLMetrics); - new EntityTable().setMetricsTTL(metricsTTL, hbaseConf); - } - // Grab the appToflowTableName argument - String appToflowTableName = commandLine.getOptionValue( - APP_TO_FLOW_TABLE_NAME_SHORT); - if (StringUtils.isNotBlank(appToflowTableName)) { - hbaseConf.set(AppToFlowTable.TABLE_NAME_CONF_NAME, appToflowTableName); - } - // Grab the applicationTableName argument - String applicationTableName = commandLine.getOptionValue( - APP_TABLE_NAME_SHORT); - if (StringUtils.isNotBlank(applicationTableName)) { - hbaseConf.set(ApplicationTable.TABLE_NAME_CONF_NAME, - applicationTableName); - } - - List<Exception> exceptions = new ArrayList<>(); - try { - boolean skipExisting - = commandLine.hasOption(SKIP_EXISTING_TABLE_OPTION_SHORT); - if (skipExisting) { - LOG.info("Will skip existing tables and continue on htable creation " - + "exceptions!"); - } - createAllTables(hbaseConf, skipExisting); - LOG.info("Successfully created HBase schema. "); - } catch (IOException e) { - LOG.error("Error in creating hbase tables: " + e.getMessage()); - exceptions.add(e); - } - - if (exceptions.size() > 0) { - LOG.warn("Schema creation finished with the following exceptions"); - for (Exception e : exceptions) { - LOG.warn(e.getMessage()); - } - System.exit(-1); - } else { - LOG.info("Schema creation finished successfully"); - } - } - - /** - * Parse command-line arguments. - * - * @param args - * command line arguments passed to program. - * @return parsed command line. - * @throws ParseException - */ - private static CommandLine parseArgs(String[] args) throws ParseException { - Options options = new Options(); - - // Input - Option o = new Option(ENTITY_TABLE_NAME_SHORT, "entityTableName", true, - "entity table name"); - o.setArgName("entityTableName"); - o.setRequired(false); - options.addOption(o); - - o = new Option(TTL_OPTION_SHORT, "metricsTTL", true, - "TTL for metrics column family"); - o.setArgName("metricsTTL"); - o.setRequired(false); - options.addOption(o); - - o = new Option(APP_TO_FLOW_TABLE_NAME_SHORT, "appToflowTableName", true, - "app to flow table name"); - o.setArgName("appToflowTableName"); - o.setRequired(false); - options.addOption(o); - - o = new Option(APP_TABLE_NAME_SHORT, "applicationTableName", true, - "application table name"); - o.setArgName("applicationTableName"); - o.setRequired(false); - options.addOption(o); - - // Options without an argument - // No need to set arg name since we do not need an argument here - o = new Option(SKIP_EXISTING_TABLE_OPTION_SHORT, "skipExistingTable", - false, "skip existing Hbase tables and continue to create new tables"); - o.setRequired(false); - options.addOption(o); - - CommandLineParser parser = new PosixParser(); - CommandLine commandLine = null; - try { - commandLine = parser.parse(options, args); - } catch (Exception e) { - LOG.error("ERROR: " + e.getMessage() + "\n"); - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(NAME + " ", options, true); - System.exit(-1); - } - - return commandLine; - } - - @VisibleForTesting - public static void createAllTables(Configuration hbaseConf, - boolean skipExisting) throws IOException { - - Connection conn = null; - try { - conn = ConnectionFactory.createConnection(hbaseConf); - Admin admin = conn.getAdmin(); - if (admin == null) { - throw new IOException("Cannot create table since admin is null"); - } - try { - new EntityTable().createTable(admin, hbaseConf); - } catch (IOException e) { - if (skipExisting) { - LOG.warn("Skip and continue on: " + e.getMessage()); - } else { - throw e; - } - } - try { - new AppToFlowTable().createTable(admin, hbaseConf); - } catch (IOException e) { - if (skipExisting) { - LOG.warn("Skip and continue on: " + e.getMessage()); - } else { - throw e; - } - } - try { - new ApplicationTable().createTable(admin, hbaseConf); - } catch (IOException e) { - if (skipExisting) { - LOG.warn("Skip and continue on: " + e.getMessage()); - } else { - throw e; - } - } - try { - new FlowRunTable().createTable(admin, hbaseConf); - } catch (IOException e) { - if (skipExisting) { - LOG.warn("Skip and continue on: " + e.getMessage()); - } else { - throw e; - } - } - try { - new FlowActivityTable().createTable(admin, hbaseConf); - } catch (IOException e) { - if (skipExisting) { - LOG.warn("Skip and continue on: " + e.getMessage()); - } else { - throw e; - } - } - } finally { - if (conn != null) { - conn.close(); - } - } - } - - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java deleted file mode 100644 index dde3911..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java +++ /dev/null @@ -1,156 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.application; - -import java.io.IOException; - -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; - -/** - * Identifies fully qualified columns for the {@link ApplicationTable}. - */ -public enum ApplicationColumn implements Column<ApplicationTable> { - - /** - * App id. - */ - ID(ApplicationColumnFamily.INFO, "id"), - - /** - * When the application was created. - */ - CREATED_TIME(ApplicationColumnFamily.INFO, "created_time", - new LongConverter()), - - /** - * The version of the flow that this app belongs to. - */ - FLOW_VERSION(ApplicationColumnFamily.INFO, "flow_version"); - - private final ColumnHelper<ApplicationTable> column; - private final ColumnFamily<ApplicationTable> columnFamily; - private final String columnQualifier; - private final byte[] columnQualifierBytes; - - private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily, - String columnQualifier) { - this(columnFamily, columnQualifier, GenericConverter.getInstance()); - } - - private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily, - String columnQualifier, ValueConverter converter) { - this.columnFamily = columnFamily; - this.columnQualifier = columnQualifier; - // Future-proof by ensuring the right column prefix hygiene. - this.columnQualifierBytes = - Bytes.toBytes(Separator.SPACE.encode(columnQualifier)); - this.column = new ColumnHelper<ApplicationTable>(columnFamily, converter); - } - - /** - * @return the column name value - */ - private String getColumnQualifier() { - return columnQualifier; - } - - public void store(byte[] rowKey, - TypedBufferedMutator<ApplicationTable> tableMutator, Long timestamp, - Object inputValue, Attribute... attributes) throws IOException { - column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, - inputValue, attributes); - } - - public Object readResult(Result result) throws IOException { - return column.readResult(result, columnQualifierBytes); - } - - @Override - public byte[] getColumnQualifierBytes() { - return columnQualifierBytes.clone(); - } - - @Override - public byte[] getColumnFamilyBytes() { - return columnFamily.getBytes(); - } - - @Override - public ValueConverter getValueConverter() { - return column.getValueConverter(); - } - - /** - * Retrieve an {@link ApplicationColumn} given a name, or null if there is no - * match. The following holds true: {@code columnFor(x) == columnFor(y)} if - * and only if {@code x.equals(y)} or {@code (x == y == null)}. - * - * @param columnQualifier Name of the column to retrieve - * @return the corresponding {@link ApplicationColumn} or null - */ - public static final ApplicationColumn columnFor(String columnQualifier) { - - // Match column based on value, assume column family matches. - for (ApplicationColumn ac : ApplicationColumn.values()) { - // Find a match based only on name. - if (ac.getColumnQualifier().equals(columnQualifier)) { - return ac; - } - } - - // Default to null - return null; - } - - /** - * Retrieve an {@link ApplicationColumn} given a name, or null if there is no - * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)} - * if and only if {@code a.equals(b) & x.equals(y)} or - * {@code (x == y == null)} - * - * @param columnFamily The columnFamily for which to retrieve the column. - * @param name Name of the column to retrieve - * @return the corresponding {@link ApplicationColumn} or null if both - * arguments don't match. - */ - public static final ApplicationColumn columnFor( - ApplicationColumnFamily columnFamily, String name) { - - for (ApplicationColumn ac : ApplicationColumn.values()) { - // Find a match based column family and on name. - if (ac.columnFamily.equals(columnFamily) - && ac.getColumnQualifier().equals(name)) { - return ac; - } - } - - // Default to null - return null; - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java deleted file mode 100644 index 97e5f7b..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.application; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; - -/** - * Represents the application table column families. - */ -public enum ApplicationColumnFamily implements ColumnFamily<ApplicationTable> { - - /** - * Info column family houses known columns, specifically ones included in - * columnfamily filters. - */ - INFO("i"), - - /** - * Configurations are in a separate column family for two reasons: a) the size - * of the config values can be very large and b) we expect that config values - * are often separately accessed from other metrics and info columns. - */ - CONFIGS("c"), - - /** - * Metrics have a separate column family, because they have a separate TTL. - */ - METRICS("m"); - - /** - * Byte representation of this column family. - */ - private final byte[] bytes; - - /** - * @param value create a column family with this name. Must be lower case and - * without spaces. - */ - private ApplicationColumnFamily(String value) { - // column families should be lower case and not contain any spaces. - this.bytes = Bytes.toBytes(Separator.SPACE.encode(value)); - } - - public byte[] getBytes() { - return Bytes.copy(bytes); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/47ec7f92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java deleted file mode 100644 index 42488f4..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java +++ /dev/null @@ -1,288 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.application; - -import java.io.IOException; -import java.util.Map; -import java.util.NavigableMap; - -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; - -/** - * Identifies partially qualified columns for the application table. - */ -public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> { - - /** - * To store TimelineEntity getIsRelatedToEntities values. - */ - IS_RELATED_TO(ApplicationColumnFamily.INFO, "s"), - - /** - * To store TimelineEntity getRelatesToEntities values. - */ - RELATES_TO(ApplicationColumnFamily.INFO, "r"), - - /** - * To store TimelineEntity info values. - */ - INFO(ApplicationColumnFamily.INFO, "i"), - - /** - * Lifecycle events for an application. - */ - EVENT(ApplicationColumnFamily.INFO, "e"), - - /** - * Config column stores configuration with config key as the column name. - */ - CONFIG(ApplicationColumnFamily.CONFIGS, null), - - /** - * Metrics are stored with the metric name as the column name. - */ - METRIC(ApplicationColumnFamily.METRICS, null, new LongConverter()); - - private final ColumnHelper<ApplicationTable> column; - private final ColumnFamily<ApplicationTable> columnFamily; - - /** - * Can be null for those cases where the provided column qualifier is the - * entire column name. - */ - private final String columnPrefix; - private final byte[] columnPrefixBytes; - - /** - * Private constructor, meant to be used by the enum definition. - * - * @param columnFamily that this column is stored in. - * @param columnPrefix for this column. - */ - private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily, - String columnPrefix) { - this(columnFamily, columnPrefix, GenericConverter.getInstance()); - } - - /** - * Private constructor, meant to be used by the enum definition. - * - * @param columnFamily that this column is stored in. - * @param columnPrefix for this column. - * @param converter used to encode/decode values to be stored in HBase for - * this column prefix. - */ - private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily, - String columnPrefix, ValueConverter converter) { - column = new ColumnHelper<ApplicationTable>(columnFamily, converter); - this.columnFamily = columnFamily; - this.columnPrefix = columnPrefix; - if (columnPrefix == null) { - this.columnPrefixBytes = null; - } else { - // Future-proof by ensuring the right column prefix hygiene. - this.columnPrefixBytes = - Bytes.toBytes(Separator.SPACE.encode(columnPrefix)); - } - } - - /** - * @return the column name value - */ - private String getColumnPrefix() { - return columnPrefix; - } - - @Override - public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) { - return ColumnHelper.getColumnQualifier( - this.columnPrefixBytes, qualifierPrefix); - } - - @Override - public byte[] getColumnPrefixBytes(String qualifierPrefix) { - return ColumnHelper.getColumnQualifier( - this.columnPrefixBytes, qualifierPrefix); - } - - @Override - public byte[] getColumnFamilyBytes() { - return columnFamily.getBytes(); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #store(byte[], - * org.apache.hadoop.yarn.server.timelineservice.storage.common. - * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) - */ - public void store(byte[] rowKey, - TypedBufferedMutator<ApplicationTable> tableMutator, byte[] qualifier, - Long timestamp, Object inputValue, Attribute... attributes) - throws IOException { - - // Null check - if (qualifier == null) { - throw new IOException("Cannot store column with null qualifier in " - + tableMutator.getName().getNameAsString()); - } - - byte[] columnQualifier = getColumnPrefixBytes(qualifier); - - column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, - attributes); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #store(byte[], - * org.apache.hadoop.yarn.server.timelineservice.storage.common. - * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) - */ - public void store(byte[] rowKey, - TypedBufferedMutator<ApplicationTable> tableMutator, String qualifier, - Long timestamp, Object inputValue, Attribute...attributes) - throws IOException { - - // Null check - if (qualifier == null) { - throw new IOException("Cannot store column with null qualifier in " - + tableMutator.getName().getNameAsString()); - } - - byte[] columnQualifier = getColumnPrefixBytes(qualifier); - - column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, - attributes); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String) - */ - public Object readResult(Result result, String qualifier) throws IOException { - byte[] columnQualifier = - ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); - return column.readResult(result, columnQualifier); - } - - public ValueConverter getValueConverter() { - return column.getValueConverter(); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResults(org.apache.hadoop.hbase.client.Result, - * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) - */ - public <K> Map<K, Object> readResults(Result result, - KeyConverter<K> keyConverter) throws IOException { - return column.readResults(result, columnPrefixBytes, keyConverter); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result, - * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) - */ - public <K, V> NavigableMap<K, NavigableMap<Long, V>> - readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter) - throws IOException { - return column.readResultsWithTimestamps(result, columnPrefixBytes, - keyConverter); - } - - /** - * Retrieve an {@link ApplicationColumnPrefix} given a name, or null if there - * is no match. The following holds true: {@code columnFor(x) == columnFor(y)} - * if and only if {@code x.equals(y)} or {@code (x == y == null)} - * - * @param columnPrefix Name of the column to retrieve - * @return the corresponding {@link ApplicationColumnPrefix} or null - */ - public static final ApplicationColumnPrefix columnFor(String columnPrefix) { - - // Match column based on value, assume column family matches. - for (ApplicationColumnPrefix acp : ApplicationColumnPrefix.values()) { - // Find a match based only on name. - if (acp.getColumnPrefix().equals(columnPrefix)) { - return acp; - } - } - - // Default to null - return null; - } - - /** - * Retrieve an {@link ApplicationColumnPrefix} given a name, or null if there - * is no match. The following holds true: - * {@code columnFor(a,x) == columnFor(b,y)} if and only if - * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)} - * - * @param columnFamily The columnFamily for which to retrieve the column. - * @param columnPrefix Name of the column to retrieve - * @return the corresponding {@link ApplicationColumnPrefix} or null if both - * arguments don't match. - */ - public static final ApplicationColumnPrefix columnFor( - ApplicationColumnFamily columnFamily, String columnPrefix) { - - // TODO: needs unit test to confirm and need to update javadoc to explain - // null prefix case. - - for (ApplicationColumnPrefix acp : ApplicationColumnPrefix.values()) { - // Find a match based column family and on name. - if (acp.columnFamily.equals(columnFamily) - && (((columnPrefix == null) && (acp.getColumnPrefix() == null)) || - (acp.getColumnPrefix().equals(columnPrefix)))) { - return acp; - } - } - - // Default to null - return null; - } - -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org