Repository: hadoop Updated Branches: refs/heads/YARN-2928 10b26bb9f -> e2229377b
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java new file mode 100644 index 0000000..642f065 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; + +/** + * Encodes and decodes row key for flow run table. + * The row key is of the form : clusterId!userId!flowName!flowrunId. + * flowrunId is a long and rest are strings. + */ +public final class FlowRunRowKeyConverter implements + KeyConverter<FlowRunRowKey> { + private static final FlowRunRowKeyConverter INSTANCE = + new FlowRunRowKeyConverter(); + + public static FlowRunRowKeyConverter getInstance() { + return INSTANCE; + } + + private FlowRunRowKeyConverter() { + } + + // Flow run row key is of the form + // clusterId!userId!flowName!flowrunId with each segment separated by !. + // The sizes below indicate sizes of each one of these segments in sequence. + // clusterId, userId and flowName are strings. flowrunId is a long hence 8 + // bytes in size. Strings are variable in size (i.e. end whenever separator is + // encountered). This is used while decoding and helps in determining where to + // split. + private static final int[] SEGMENT_SIZES = { + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Bytes.SIZEOF_LONG }; + + /* + * (non-Javadoc) + * + * Encodes FlowRunRowKey object into a byte array with each component/field in + * FlowRunRowKey separated by Separator#QUALIFIERS. This leads to an + * flow run row key of the form clusterId!userId!flowName!flowrunId + * If flowRunId in passed FlowRunRowKey object is null (and the fields + * preceding it i.e. clusterId, userId and flowName are not null), this + * returns a row key prefix of the form clusterId!userName!flowName! + * flowRunId is inverted while encoding as it helps maintain a descending + * order for flow keys in flow run table. + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter + * #encode(java.lang.Object) + */ + @Override + public byte[] encode(FlowRunRowKey rowKey) { + byte[] first = Separator.QUALIFIERS.join( + Separator.encode(rowKey.getClusterId(), Separator.SPACE, Separator.TAB, + Separator.QUALIFIERS), + Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB, + Separator.QUALIFIERS), + Separator.encode(rowKey.getFlowName(), Separator.SPACE, Separator.TAB, + Separator.QUALIFIERS)); + if (rowKey.getFlowRunId() == null) { + return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES); + } else { + // Note that flowRunId is a long, so we can't encode them all at the same + // time. + byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong( + rowKey.getFlowRunId())); + return Separator.QUALIFIERS.join(first, second); + } + } + + /* + * (non-Javadoc) + * + * Decodes an flow run row key of the form + * clusterId!userId!flowName!flowrunId represented in byte format and converts + * it into an FlowRunRowKey object. flowRunId is inverted while decoding as + * it was inverted while encoding. + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter + * #decode(byte[]) + */ + @Override + public FlowRunRowKey decode(byte[] rowKey) { + byte[][] rowKeyComponents = + Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); + if (rowKeyComponents.length != 4) { + throw new IllegalArgumentException("the row key is not valid for " + + "a flow run"); + } + String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + String userId = Separator.decode(Bytes.toString(rowKeyComponents[1]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + String flowName = Separator.decode(Bytes.toString(rowKeyComponents[2]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + Long flowRunId = + TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3])); + return new FlowRunRowKey(clusterId, userId, flowName, flowRunId); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index 398d7b4..648c77b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -44,9 +44,10 @@ import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; import com.google.common.annotations.VisibleForTesting; @@ -193,7 +194,7 @@ class FlowScanner implements RegionScanner, Closeable { // So all cells in one qualifier come one after the other before we see the // next column qualifier ByteArrayComparator comp = new ByteArrayComparator(); - byte[] currentColumnQualifier = TimelineStorageUtils.EMPTY_BYTES; + byte[] currentColumnQualifier = Separator.EMPTY_BYTES; AggregationOperation currentAggOp = null; SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR); Set<String> alreadySeenAggDim = new HashSet<>(); @@ -314,7 +315,7 @@ class FlowScanner implements RegionScanner, Closeable { + " cell qualifier=" + Bytes.toString(CellUtil.cloneQualifier(cell)) + " cell value= " - + (Number) converter.decodeValue(CellUtil.cloneValue(cell)) + + converter.decodeValue(CellUtil.cloneValue(cell)) + " timestamp=" + cell.getTimestamp()); } @@ -480,7 +481,7 @@ class FlowScanner implements RegionScanner, Closeable { LOG.trace("MAJOR COMPACTION loop sum= " + sum + " discarding now: " + " qualifier=" + Bytes.toString(CellUtil.cloneQualifier(cell)) + " value=" - + (Number) converter.decodeValue(CellUtil.cloneValue(cell)) + + converter.decodeValue(CellUtil.cloneValue(cell)) + " timestamp=" + cell.getTimestamp() + " " + this.action); } } else { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index d8ca038..faecd14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrie import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; @@ -125,7 +126,7 @@ class FlowActivityEntityReader extends TimelineEntityReader { protected TimelineEntity parseEntity(Result result) throws IOException { FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow()); - long time = rowKey.getDayTimestamp(); + Long time = rowKey.getDayTimestamp(); String user = rowKey.getUserId(); String flowName = rowKey.getFlowName(); @@ -135,10 +136,11 @@ class FlowActivityEntityReader extends TimelineEntityReader { flowActivity.setId(flowActivity.getId()); // get the list of run ids along with the version that are associated with // this flow on this day - Map<String, Object> runIdsMap = - FlowActivityColumnPrefix.RUN_ID.readResults(result); - for (Map.Entry<String, Object> e : runIdsMap.entrySet()) { - Long runId = Long.valueOf(e.getKey()); + Map<Long, Object> runIdsMap = + FlowActivityColumnPrefix.RUN_ID.readResults(result, + LongKeyConverter.getInstance()); + for (Map.Entry<Long, Object> e : runIdsMap.entrySet()) { + Long runId = e.getKey(); String version = (String)e.getValue(); FlowRunEntity flowRun = new FlowRunEntity(); flowRun.setUser(user); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java index 4299de9..be27643 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilter import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; /** * The base class for reading and deserializing timeline entities from the @@ -329,7 +330,8 @@ public abstract class TimelineEntityReader { protected void readMetrics(TimelineEntity entity, Result result, ColumnPrefix<?> columnPrefix) throws IOException { NavigableMap<String, NavigableMap<Long, Number>> metricsResult = - columnPrefix.readResultsWithTimestamps(result); + columnPrefix.readResultsWithTimestamps( + result, StringKeyConverter.getInstance()); for (Map.Entry<String, NavigableMap<Long, Number>> metricResult: metricsResult.entrySet()) { TimelineMetric metric = new TimelineMetric(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java new file mode 100644 index 0000000..74e4b5d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKeyConverter; +import org.junit.Test; + +public class TestKeyConverters { + private final static String QUALIFIER_SEP = Separator.QUALIFIERS.getValue(); + private final static byte[] QUALIFIER_SEP_BYTES = + Bytes.toBytes(QUALIFIER_SEP); + private final static String CLUSTER = "cl" + QUALIFIER_SEP + "uster"; + private final static String USER = QUALIFIER_SEP + "user"; + private final static String FLOW_NAME = + "dummy_" + QUALIFIER_SEP + "flow" + QUALIFIER_SEP; + private final static Long FLOW_RUN_ID; + private final static String APPLICATION_ID; + static { + long runid = Long.MAX_VALUE - 900L; + byte[] longMaxByteArr = Bytes.toBytes(Long.MAX_VALUE); + byte[] byteArr = Bytes.toBytes(runid); + int sepByteLen = QUALIFIER_SEP_BYTES.length; + if (sepByteLen <= byteArr.length) { + for (int i = 0; i < sepByteLen; i++) { + byteArr[i] = (byte)(longMaxByteArr[i] - QUALIFIER_SEP_BYTES[i]); + } + } + FLOW_RUN_ID = Bytes.toLong(byteArr); + long clusterTs = System.currentTimeMillis(); + byteArr = Bytes.toBytes(clusterTs); + if (sepByteLen <= byteArr.length) { + for (int i = 0; i < sepByteLen; i++) { + byteArr[byteArr.length - sepByteLen + i] = + (byte)(longMaxByteArr[byteArr.length - sepByteLen + i] - + QUALIFIER_SEP_BYTES[i]); + } + } + clusterTs = Bytes.toLong(byteArr); + int seqId = 222; + APPLICATION_ID = ApplicationId.newInstance(clusterTs, seqId).toString(); + } + + private static void verifyRowPrefixBytes(byte[] byteRowKeyPrefix) { + int sepLen = QUALIFIER_SEP_BYTES.length; + for (int i = 0; i < sepLen; i++) { + assertTrue("Row key prefix not encoded properly.", + byteRowKeyPrefix[byteRowKeyPrefix.length - sepLen + i] == + QUALIFIER_SEP_BYTES[i]); + } + } + + @Test + public void testFlowActivityRowKeyConverter() { + Long ts = TimelineStorageUtils.getTopOfTheDayTimestamp(1459900830000L); + byte[] byteRowKey = FlowActivityRowKeyConverter.getInstance().encode( + new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME)); + FlowActivityRowKey rowKey = + FlowActivityRowKeyConverter.getInstance().decode(byteRowKey); + assertEquals(CLUSTER, rowKey.getClusterId()); + assertEquals(ts, rowKey.getDayTimestamp()); + assertEquals(USER, rowKey.getUserId()); + assertEquals(FLOW_NAME, rowKey.getFlowName()); + + byte[] byteRowKeyPrefix = FlowActivityRowKeyConverter.getInstance().encode( + new FlowActivityRowKey(CLUSTER, null, null, null)); + byte[][] splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }); + assertEquals(2, splits.length); + assertEquals(0, splits[1].length); + assertEquals(CLUSTER, + Separator.QUALIFIERS.decode(Bytes.toString(splits[0]))); + verifyRowPrefixBytes(byteRowKeyPrefix); + + byteRowKeyPrefix = FlowActivityRowKeyConverter.getInstance().encode( + new FlowActivityRowKey(CLUSTER, ts, null, null)); + splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { + Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE }); + assertEquals(3, splits.length); + assertEquals(0, splits[2].length); + assertEquals(CLUSTER, + Separator.QUALIFIERS.decode(Bytes.toString(splits[0]))); + assertEquals(ts, (Long) TimelineStorageUtils.invertLong( + Bytes.toLong(splits[1]))); + verifyRowPrefixBytes(byteRowKeyPrefix); + } + + @Test + public void testFlowRunRowKeyConverter() { + byte[] byteRowKey = FlowRunRowKeyConverter.getInstance().encode( + new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID)); + FlowRunRowKey rowKey = + FlowRunRowKeyConverter.getInstance().decode(byteRowKey); + assertEquals(CLUSTER, rowKey.getClusterId()); + assertEquals(USER, rowKey.getUserId()); + assertEquals(FLOW_NAME, rowKey.getFlowName()); + assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId()); + + byte[] byteRowKeyPrefix = FlowRunRowKeyConverter.getInstance().encode( + new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, null)); + byte[][] splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }); + assertEquals(4, splits.length); + assertEquals(0, splits[3].length); + assertEquals(FLOW_NAME, + Separator.QUALIFIERS.decode(Bytes.toString(splits[2]))); + verifyRowPrefixBytes(byteRowKeyPrefix); + } + + @Test + public void testApplicationRowKeyConverter() { + byte[] byteRowKey = ApplicationRowKeyConverter.getInstance().encode( + new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, + APPLICATION_ID)); + ApplicationRowKey rowKey = + ApplicationRowKeyConverter.getInstance().decode(byteRowKey); + assertEquals(CLUSTER, rowKey.getClusterId()); + assertEquals(USER, rowKey.getUserId()); + assertEquals(FLOW_NAME, rowKey.getFlowName()); + assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId()); + assertEquals(APPLICATION_ID, rowKey.getAppId()); + + byte[] byteRowKeyPrefix = ApplicationRowKeyConverter.getInstance().encode( + new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, null)); + byte[][] splits = + Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, + Separator.VARIABLE_SIZE }); + assertEquals(5, splits.length); + assertEquals(0, splits[4].length); + assertEquals(FLOW_NAME, + Separator.QUALIFIERS.decode(Bytes.toString(splits[2]))); + assertEquals(FLOW_RUN_ID, (Long)TimelineStorageUtils.invertLong( + Bytes.toLong(splits[3]))); + verifyRowPrefixBytes(byteRowKeyPrefix); + + byteRowKeyPrefix = ApplicationRowKeyConverter.getInstance().encode( + new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, null, null)); + splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }); + assertEquals(4, splits.length); + assertEquals(0, splits[3].length); + assertEquals(FLOW_NAME, + Separator.QUALIFIERS.decode(Bytes.toString(splits[2]))); + verifyRowPrefixBytes(byteRowKeyPrefix); + } + + @Test + public void testEntityRowKeyConverter() { + String entityId = "!ent!ity!!id!"; + String entityType = "entity!Type"; + byte[] byteRowKey = EntityRowKeyConverter.getInstance().encode( + new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID, + entityType, entityId)); + EntityRowKey rowKey = + EntityRowKeyConverter.getInstance().decode(byteRowKey); + assertEquals(CLUSTER, rowKey.getClusterId()); + assertEquals(USER, rowKey.getUserId()); + assertEquals(FLOW_NAME, rowKey.getFlowName()); + assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId()); + assertEquals(APPLICATION_ID, rowKey.getAppId()); + assertEquals(entityType, rowKey.getEntityType()); + assertEquals(entityId, rowKey.getEntityId()); + + byte[] byteRowKeyPrefix = EntityRowKeyConverter.getInstance().encode( + new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID, + entityType, null)); + byte[][] splits = + Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, + AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE }); + assertEquals(7, splits.length); + assertEquals(0, splits[6].length); + assertEquals(APPLICATION_ID, + AppIdKeyConverter.getInstance().decode(splits[4])); + assertEquals(entityType, Separator.QUALIFIERS.decode( + Bytes.toString(splits[5]))); + verifyRowPrefixBytes(byteRowKeyPrefix); + + byteRowKeyPrefix = EntityRowKeyConverter.getInstance().encode( + new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID, + null, null)); + splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, + AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE }); + assertEquals(6, splits.length); + assertEquals(0, splits[5].length); + assertEquals(APPLICATION_ID, + AppIdKeyConverter.getInstance().decode(splits[4])); + verifyRowPrefixBytes(byteRowKeyPrefix); + } + + @Test + public void testAppToFlowRowKeyConverter() { + byte[] byteRowKey = AppToFlowRowKeyConverter.getInstance().encode( + new AppToFlowRowKey(CLUSTER, APPLICATION_ID)); + AppToFlowRowKey rowKey = + AppToFlowRowKeyConverter.getInstance().decode(byteRowKey); + assertEquals(CLUSTER, rowKey.getClusterId()); + assertEquals(APPLICATION_ID, rowKey.getAppId()); + } + + @Test + public void testAppIdKeyConverter() { + long currentTs = System.currentTimeMillis(); + ApplicationId appId1 = ApplicationId.newInstance(currentTs, 1); + ApplicationId appId2 = ApplicationId.newInstance(currentTs, 2); + ApplicationId appId3 = ApplicationId.newInstance(currentTs + 300, 1); + String appIdStr1 = appId1.toString(); + String appIdStr2 = appId2.toString(); + String appIdStr3 = appId3.toString(); + byte[] appIdBytes1 = AppIdKeyConverter.getInstance().encode(appIdStr1); + byte[] appIdBytes2 = AppIdKeyConverter.getInstance().encode(appIdStr2); + byte[] appIdBytes3 = AppIdKeyConverter.getInstance().encode(appIdStr3); + // App ids' should be encoded in a manner wherein descending order + // is maintained. + assertTrue("Ordering of app ids' is incorrect", + Bytes.compareTo(appIdBytes1, appIdBytes2) > 0 && + Bytes.compareTo(appIdBytes1, appIdBytes3) > 0 && + Bytes.compareTo(appIdBytes2, appIdBytes3) > 0); + String decodedAppId1 = AppIdKeyConverter.getInstance().decode(appIdBytes1); + String decodedAppId2 = AppIdKeyConverter.getInstance().decode(appIdBytes2); + String decodedAppId3 = AppIdKeyConverter.getInstance().decode(appIdBytes3); + assertTrue("Decoded app id is not same as the app id encoded", + appIdStr1.equals(decodedAppId1)); + assertTrue("Decoded app id is not same as the app id encoded", + appIdStr2.equals(decodedAppId2)); + assertTrue("Decoded app id is not same as the app id encoded", + appIdStr3.equals(decodedAppId3)); + } + + @Test + public void testEventColumnNameConverter() { + String eventId = "=foo_=eve=nt="; + byte[] valSepBytes = Bytes.toBytes(Separator.VALUES.getValue()); + byte[] maxByteArr = + Bytes.createMaxByteArray(Bytes.SIZEOF_LONG - valSepBytes.length); + byte[] ts = Bytes.add(valSepBytes, maxByteArr); + Long eventTs = Bytes.toLong(ts); + byte[] byteEventColName = EventColumnNameConverter.getInstance().encode( + new EventColumnName(eventId, eventTs, null)); + EventColumnName eventColName = + EventColumnNameConverter.getInstance().decode(byteEventColName); + assertEquals(eventId, eventColName.getId()); + assertEquals(eventTs, eventColName.getTimestamp()); + assertNull(eventColName.getInfoKey()); + + String infoKey = "f=oo_event_in=fo=_key"; + byteEventColName = EventColumnNameConverter.getInstance().encode( + new EventColumnName(eventId, eventTs, infoKey)); + eventColName = + EventColumnNameConverter.getInstance().decode(byteEventColName); + assertEquals(eventId, eventColName.getId()); + assertEquals(eventTs, eventColName.getTimestamp()); + assertEquals(infoKey, eventColName.getInfoKey()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java index 8b25a83..0cda97c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java @@ -19,11 +19,13 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import com.google.common.collect.Iterables; @@ -32,7 +34,7 @@ public class TestSeparator { private static String villain = "Dr. Heinz Doofenshmirtz"; private static String special = - ". * | ? + ( ) [ ] { } ^ $ \\ \""; + ". * | ? + \t ( ) [ ] { } ^ $ \\ \""; /** * @@ -47,6 +49,7 @@ public class TestSeparator { testEncodeDecode(separator, "?"); testEncodeDecode(separator, "&"); testEncodeDecode(separator, "+"); + testEncodeDecode(separator, "\t"); testEncodeDecode(separator, "Dr."); testEncodeDecode(separator, "Heinz"); testEncodeDecode(separator, "Doofenshmirtz"); @@ -79,6 +82,83 @@ public class TestSeparator { } + @Test + public void testSplits() { + byte[] maxLongBytes = Bytes.toBytes(Long.MAX_VALUE); + byte[] maxIntBytes = Bytes.toBytes(Integer.MAX_VALUE); + for (Separator separator : Separator.values()) { + String str1 = "cl" + separator.getValue() + "us"; + String str2 = separator.getValue() + "rst"; + byte[] sepByteArr = Bytes.toBytes(separator.getValue()); + byte[] longVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxLongBytes, + sepByteArr.length, Bytes.SIZEOF_LONG - sepByteArr.length)); + byte[] intVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxIntBytes, + sepByteArr.length, Bytes.SIZEOF_INT - sepByteArr.length)); + byte[] arr = separator.join( + Bytes.toBytes(separator.encode(str1)),longVal1Arr, + Bytes.toBytes(separator.encode(str2)), intVal1Arr); + int[] sizes = { Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, + Separator.VARIABLE_SIZE, Bytes.SIZEOF_INT }; + byte[][] splits = separator.split(arr, sizes); + assertEquals(4, splits.length); + assertEquals(str1, separator.decode(Bytes.toString(splits[0]))); + assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1])); + assertEquals(str2, separator.decode(Bytes.toString(splits[2]))); + assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3])); + + longVal1Arr = Bytes.add(Bytes.copy(maxLongBytes, 0, Bytes.SIZEOF_LONG - + sepByteArr.length), sepByteArr); + intVal1Arr = Bytes.add(Bytes.copy(maxIntBytes, 0, Bytes.SIZEOF_INT - + sepByteArr.length), sepByteArr); + arr = separator.join(Bytes.toBytes(separator.encode(str1)),longVal1Arr, + Bytes.toBytes(separator.encode(str2)), intVal1Arr); + splits = separator.split(arr, sizes); + assertEquals(4, splits.length); + assertEquals(str1, separator.decode(Bytes.toString(splits[0]))); + assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1])); + assertEquals(str2, separator.decode(Bytes.toString(splits[2]))); + assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3])); + + longVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxLongBytes, + sepByteArr.length, 4 - sepByteArr.length), sepByteArr); + longVal1Arr = Bytes.add(longVal1Arr, Bytes.copy(maxLongBytes, 4, 3 - + sepByteArr.length), sepByteArr); + arr = separator.join(Bytes.toBytes(separator.encode(str1)),longVal1Arr, + Bytes.toBytes(separator.encode(str2)), intVal1Arr); + splits = separator.split(arr, sizes); + assertEquals(4, splits.length); + assertEquals(str1, separator.decode(Bytes.toString(splits[0]))); + assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1])); + assertEquals(str2, separator.decode(Bytes.toString(splits[2]))); + assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3])); + + arr = separator.join(Bytes.toBytes(separator.encode(str1)), + Bytes.toBytes(separator.encode(str2)), intVal1Arr, longVal1Arr); + int[] sizes1 = { Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Bytes.SIZEOF_INT, Bytes.SIZEOF_LONG }; + splits = separator.split(arr, sizes1); + assertEquals(4, splits.length); + assertEquals(str1, separator.decode(Bytes.toString(splits[0]))); + assertEquals(str2, separator.decode(Bytes.toString(splits[1]))); + assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[2])); + assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[3])); + + try { + int[] sizes2 = { Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Bytes.SIZEOF_INT, 7 }; + splits = separator.split(arr, sizes2); + fail("Exception should have been thrown."); + } catch (IllegalArgumentException e) {} + + try { + int[] sizes2 = { Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, 2, + Bytes.SIZEOF_LONG }; + splits = separator.split(arr, sizes2); + fail("Exception should have been thrown."); + } catch (IllegalArgumentException e) {} + } + } + /** * Simple test to encode and decode using the same separators and confirm that * we end up with the same as what we started with. http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineStorageUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineStorageUtils.java deleted file mode 100644 index 046eda1..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineStorageUtils.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage.common; - -import static org.junit.Assert.assertTrue; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.junit.Test; - -public class TestTimelineStorageUtils { - - @Test - public void testEncodeDecodeAppId() { - long currentTs = System.currentTimeMillis(); - ApplicationId appId1 = ApplicationId.newInstance(currentTs, 1); - ApplicationId appId2 = ApplicationId.newInstance(currentTs, 2); - ApplicationId appId3 = ApplicationId.newInstance(currentTs + 300, 1); - String appIdStr1 = appId1.toString(); - String appIdStr2 = appId2.toString(); - String appIdStr3 = appId3.toString(); - byte[] appIdBytes1 = TimelineStorageUtils.encodeAppId(appIdStr1); - byte[] appIdBytes2 = TimelineStorageUtils.encodeAppId(appIdStr2); - byte[] appIdBytes3 = TimelineStorageUtils.encodeAppId(appIdStr3); - // App ids' should be encoded in a manner wherein descending order - // is maintained. - assertTrue("Ordering of app ids' is incorrect", - Bytes.compareTo(appIdBytes1, appIdBytes2) > 0 && - Bytes.compareTo(appIdBytes1, appIdBytes3) > 0 && - Bytes.compareTo(appIdBytes2, appIdBytes3) > 0); - String decodedAppId1 = TimelineStorageUtils.decodeAppId(appIdBytes1); - String decodedAppId2 = TimelineStorageUtils.decodeAppId(appIdBytes2); - String decodedAppId3 = TimelineStorageUtils.decodeAppId(appIdBytes3); - assertTrue("Decoded app id is not same as the app id encoded", - appIdStr1.equals(decodedAppId1)); - assertTrue("Decoded app id is not same as the app id encoded", - appIdStr2.equals(decodedAppId2)); - assertTrue("Decoded app id is not same as the app id encoded", - appIdStr3.equals(decodedAppId3)); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org