Repository: hadoop
Updated Branches:
  refs/heads/YARN-2928 10b26bb9f -> e2229377b


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java
new file mode 100644
index 0000000..642f065
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+
+/**
+ * Encodes and decodes row key for flow run table.
+ * The row key is of the form : clusterId!userId!flowName!flowrunId.
+ * flowrunId is a long and rest are strings.
+ */
+public final class FlowRunRowKeyConverter implements
+    KeyConverter<FlowRunRowKey> {
+  private static final FlowRunRowKeyConverter INSTANCE =
+      new FlowRunRowKeyConverter();
+
+  public static FlowRunRowKeyConverter getInstance() {
+    return INSTANCE;
+  }
+
+  private FlowRunRowKeyConverter() {
+  }
+
+  // Flow run row key is of the form
+  // clusterId!userId!flowName!flowrunId with each segment separated by !.
+  // The sizes below indicate sizes of each one of these segments in sequence.
+  // clusterId, userId and flowName are strings. flowrunId is a long hence 8
+  // bytes in size. Strings are variable in size (i.e. end whenever separator 
is
+  // encountered). This is used while decoding and helps in determining where 
to
+  // split.
+  private static final int[] SEGMENT_SIZES = {
+      Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, 
Separator.VARIABLE_SIZE,
+      Bytes.SIZEOF_LONG };
+
+  /*
+   * (non-Javadoc)
+   *
+   * Encodes FlowRunRowKey object into a byte array with each component/field 
in
+   * FlowRunRowKey separated by Separator#QUALIFIERS. This leads to an
+   * flow run row key of the form clusterId!userId!flowName!flowrunId
+   * If flowRunId in passed FlowRunRowKey object is null (and the fields
+   * preceding it i.e. clusterId, userId and flowName are not null), this
+   * returns a row key prefix of the form clusterId!userName!flowName!
+   * flowRunId is inverted while encoding as it helps maintain a descending
+   * order for flow keys in flow run table.
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #encode(java.lang.Object)
+   */
+  @Override
+  public byte[] encode(FlowRunRowKey rowKey) {
+    byte[] first = Separator.QUALIFIERS.join(
+        Separator.encode(rowKey.getClusterId(), Separator.SPACE, Separator.TAB,
+            Separator.QUALIFIERS),
+        Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
+            Separator.QUALIFIERS),
+        Separator.encode(rowKey.getFlowName(), Separator.SPACE, Separator.TAB,
+            Separator.QUALIFIERS));
+    if (rowKey.getFlowRunId() == null) {
+      return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES);
+    } else {
+      // Note that flowRunId is a long, so we can't encode them all at the same
+      // time.
+      byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(
+          rowKey.getFlowRunId()));
+      return Separator.QUALIFIERS.join(first, second);
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * Decodes an flow run row key of the form
+   * clusterId!userId!flowName!flowrunId represented in byte format and 
converts
+   * it into an FlowRunRowKey object. flowRunId is inverted while decoding as
+   * it was inverted while encoding.
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
+   * #decode(byte[])
+   */
+  @Override
+  public FlowRunRowKey decode(byte[] rowKey) {
+    byte[][] rowKeyComponents =
+        Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
+    if (rowKeyComponents.length != 4) {
+      throw new IllegalArgumentException("the row key is not valid for " +
+          "a flow run");
+    }
+    String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
+        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+    String userId = Separator.decode(Bytes.toString(rowKeyComponents[1]),
+        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+    String flowName = Separator.decode(Bytes.toString(rowKeyComponents[2]),
+        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+    Long flowRunId =
+        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
+    return new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index 398d7b4..648c77b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -44,9 +44,10 @@ import 
org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -193,7 +194,7 @@ class FlowScanner implements RegionScanner, Closeable {
     // So all cells in one qualifier come one after the other before we see the
     // next column qualifier
     ByteArrayComparator comp = new ByteArrayComparator();
-    byte[] currentColumnQualifier = TimelineStorageUtils.EMPTY_BYTES;
+    byte[] currentColumnQualifier = Separator.EMPTY_BYTES;
     AggregationOperation currentAggOp = null;
     SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
     Set<String> alreadySeenAggDim = new HashSet<>();
@@ -314,7 +315,7 @@ class FlowScanner implements RegionScanner, Closeable {
             + " cell qualifier="
             + Bytes.toString(CellUtil.cloneQualifier(cell))
             + " cell value= "
-            + (Number) converter.decodeValue(CellUtil.cloneValue(cell))
+            + converter.decodeValue(CellUtil.cloneValue(cell))
             + " timestamp=" + cell.getTimestamp());
       }
 
@@ -480,7 +481,7 @@ class FlowScanner implements RegionScanner, Closeable {
             LOG.trace("MAJOR COMPACTION loop sum= " + sum
                 + " discarding now: " + " qualifier="
                 + Bytes.toString(CellUtil.cloneQualifier(cell)) + " value="
-                + (Number) converter.decodeValue(CellUtil.cloneValue(cell))
+                + converter.decodeValue(CellUtil.cloneValue(cell))
                 + " timestamp=" + cell.getTimestamp() + " " + this.action);
           }
         } else {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
index d8ca038..faecd14 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
@@ -35,6 +35,7 @@ import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrie
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
@@ -125,7 +126,7 @@ class FlowActivityEntityReader extends TimelineEntityReader 
{
   protected TimelineEntity parseEntity(Result result) throws IOException {
     FlowActivityRowKey rowKey = 
FlowActivityRowKey.parseRowKey(result.getRow());
 
-    long time = rowKey.getDayTimestamp();
+    Long time = rowKey.getDayTimestamp();
     String user = rowKey.getUserId();
     String flowName = rowKey.getFlowName();
 
@@ -135,10 +136,11 @@ class FlowActivityEntityReader extends 
TimelineEntityReader {
     flowActivity.setId(flowActivity.getId());
     // get the list of run ids along with the version that are associated with
     // this flow on this day
-    Map<String, Object> runIdsMap =
-        FlowActivityColumnPrefix.RUN_ID.readResults(result);
-    for (Map.Entry<String, Object> e : runIdsMap.entrySet()) {
-      Long runId = Long.valueOf(e.getKey());
+    Map<Long, Object> runIdsMap =
+        FlowActivityColumnPrefix.RUN_ID.readResults(result,
+            LongKeyConverter.getInstance());
+    for (Map.Entry<Long, Object> e : runIdsMap.entrySet()) {
+      Long runId = e.getKey();
       String version = (String)e.getValue();
       FlowRunEntity flowRun = new FlowRunEntity();
       flowRun.setUser(user);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
index 4299de9..be27643 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
@@ -38,6 +38,7 @@ import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilter
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
 
 /**
  * The base class for reading and deserializing timeline entities from the
@@ -329,7 +330,8 @@ public abstract class TimelineEntityReader {
   protected void readMetrics(TimelineEntity entity, Result result,
       ColumnPrefix<?> columnPrefix) throws IOException {
     NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
-        columnPrefix.readResultsWithTimestamps(result);
+        columnPrefix.readResultsWithTimestamps(
+            result, StringKeyConverter.getInstance());
     for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
         metricsResult.entrySet()) {
       TimelineMetric metric = new TimelineMetric();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
new file mode 100644
index 0000000..74e4b5d
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKeyConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyConverter;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKeyConverter;
+import org.junit.Test;
+
+public class TestKeyConverters {
+  private final static String QUALIFIER_SEP = Separator.QUALIFIERS.getValue();
+  private final static byte[] QUALIFIER_SEP_BYTES =
+      Bytes.toBytes(QUALIFIER_SEP);
+  private final static String CLUSTER = "cl" + QUALIFIER_SEP + "uster";
+  private final static String USER = QUALIFIER_SEP + "user";
+  private final static String FLOW_NAME =
+      "dummy_" + QUALIFIER_SEP + "flow" + QUALIFIER_SEP;
+  private final static Long FLOW_RUN_ID;
+  private final static String APPLICATION_ID;
+  static {
+    long runid = Long.MAX_VALUE - 900L;
+    byte[] longMaxByteArr = Bytes.toBytes(Long.MAX_VALUE);
+    byte[] byteArr = Bytes.toBytes(runid);
+    int sepByteLen = QUALIFIER_SEP_BYTES.length;
+    if (sepByteLen <= byteArr.length) {
+      for (int i = 0; i < sepByteLen; i++) {
+        byteArr[i] = (byte)(longMaxByteArr[i] - QUALIFIER_SEP_BYTES[i]);
+      }
+    }
+    FLOW_RUN_ID = Bytes.toLong(byteArr);
+    long clusterTs = System.currentTimeMillis();
+    byteArr = Bytes.toBytes(clusterTs);
+    if (sepByteLen <= byteArr.length) {
+      for (int i = 0; i < sepByteLen; i++) {
+        byteArr[byteArr.length - sepByteLen + i] =
+            (byte)(longMaxByteArr[byteArr.length - sepByteLen + i] -
+                QUALIFIER_SEP_BYTES[i]);
+      }
+    }
+    clusterTs = Bytes.toLong(byteArr);
+    int seqId = 222;
+    APPLICATION_ID = ApplicationId.newInstance(clusterTs, seqId).toString();
+  }
+
+  private static void verifyRowPrefixBytes(byte[] byteRowKeyPrefix) {
+    int sepLen = QUALIFIER_SEP_BYTES.length;
+    for (int i = 0; i < sepLen; i++) {
+      assertTrue("Row key prefix not encoded properly.",
+        byteRowKeyPrefix[byteRowKeyPrefix.length - sepLen  + i] ==
+            QUALIFIER_SEP_BYTES[i]);
+    }
+  }
+
+  @Test
+  public void testFlowActivityRowKeyConverter() {
+    Long ts = TimelineStorageUtils.getTopOfTheDayTimestamp(1459900830000L);
+    byte[] byteRowKey = FlowActivityRowKeyConverter.getInstance().encode(
+        new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME));
+    FlowActivityRowKey rowKey =
+        FlowActivityRowKeyConverter.getInstance().decode(byteRowKey);
+    assertEquals(CLUSTER, rowKey.getClusterId());
+    assertEquals(ts, rowKey.getDayTimestamp());
+    assertEquals(USER, rowKey.getUserId());
+    assertEquals(FLOW_NAME, rowKey.getFlowName());
+
+    byte[] byteRowKeyPrefix = FlowActivityRowKeyConverter.getInstance().encode(
+        new FlowActivityRowKey(CLUSTER, null, null, null));
+    byte[][] splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
+        Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
+    assertEquals(2, splits.length);
+    assertEquals(0, splits[1].length);
+    assertEquals(CLUSTER,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[0])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+
+    byteRowKeyPrefix = FlowActivityRowKeyConverter.getInstance().encode(
+        new FlowActivityRowKey(CLUSTER, ts, null, null));
+    splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
+        Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE });
+    assertEquals(3, splits.length);
+    assertEquals(0, splits[2].length);
+    assertEquals(CLUSTER,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[0])));
+    assertEquals(ts, (Long) TimelineStorageUtils.invertLong(
+        Bytes.toLong(splits[1])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+  }
+
+  @Test
+  public void testFlowRunRowKeyConverter() {
+    byte[] byteRowKey = FlowRunRowKeyConverter.getInstance().encode(
+        new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID));
+    FlowRunRowKey rowKey =
+        FlowRunRowKeyConverter.getInstance().decode(byteRowKey);
+    assertEquals(CLUSTER, rowKey.getClusterId());
+    assertEquals(USER, rowKey.getUserId());
+    assertEquals(FLOW_NAME, rowKey.getFlowName());
+    assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
+
+    byte[] byteRowKeyPrefix = FlowRunRowKeyConverter.getInstance().encode(
+        new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, null));
+    byte[][] splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
+        Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+        Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
+    assertEquals(4, splits.length);
+    assertEquals(0, splits[3].length);
+    assertEquals(FLOW_NAME,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+  }
+
+  @Test
+  public void testApplicationRowKeyConverter() {
+    byte[] byteRowKey = ApplicationRowKeyConverter.getInstance().encode(
+        new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
+            APPLICATION_ID));
+    ApplicationRowKey rowKey =
+        ApplicationRowKeyConverter.getInstance().decode(byteRowKey);
+    assertEquals(CLUSTER, rowKey.getClusterId());
+    assertEquals(USER, rowKey.getUserId());
+    assertEquals(FLOW_NAME, rowKey.getFlowName());
+    assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
+    assertEquals(APPLICATION_ID, rowKey.getAppId());
+
+    byte[] byteRowKeyPrefix = ApplicationRowKeyConverter.getInstance().encode(
+        new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, null));
+    byte[][] splits =
+        Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
+            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+            Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+            Separator.VARIABLE_SIZE });
+    assertEquals(5, splits.length);
+    assertEquals(0, splits[4].length);
+    assertEquals(FLOW_NAME,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
+    assertEquals(FLOW_RUN_ID, (Long)TimelineStorageUtils.invertLong(
+        Bytes.toLong(splits[3])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+
+    byteRowKeyPrefix = ApplicationRowKeyConverter.getInstance().encode(
+        new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, null, null));
+    splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
+        Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+        Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
+    assertEquals(4, splits.length);
+    assertEquals(0, splits[3].length);
+    assertEquals(FLOW_NAME,
+        Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+  }
+
+  @Test
+  public void testEntityRowKeyConverter() {
+    String entityId = "!ent!ity!!id!";
+    String entityType = "entity!Type";
+    byte[] byteRowKey = EntityRowKeyConverter.getInstance().encode(
+        new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
+            entityType, entityId));
+    EntityRowKey rowKey =
+        EntityRowKeyConverter.getInstance().decode(byteRowKey);
+    assertEquals(CLUSTER, rowKey.getClusterId());
+    assertEquals(USER, rowKey.getUserId());
+    assertEquals(FLOW_NAME, rowKey.getFlowName());
+    assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
+    assertEquals(APPLICATION_ID, rowKey.getAppId());
+    assertEquals(entityType, rowKey.getEntityType());
+    assertEquals(entityId, rowKey.getEntityId());
+
+    byte[] byteRowKeyPrefix = EntityRowKeyConverter.getInstance().encode(
+        new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
+            entityType, null));
+    byte[][] splits =
+        Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
+            Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+            Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+            AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE,
+            Separator.VARIABLE_SIZE });
+    assertEquals(7, splits.length);
+    assertEquals(0, splits[6].length);
+    assertEquals(APPLICATION_ID,
+        AppIdKeyConverter.getInstance().decode(splits[4]));
+    assertEquals(entityType, Separator.QUALIFIERS.decode(
+        Bytes.toString(splits[5])));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+
+    byteRowKeyPrefix = EntityRowKeyConverter.getInstance().encode(
+        new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
+        null, null));
+    splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
+        Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+        Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+        AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE });
+    assertEquals(6, splits.length);
+    assertEquals(0, splits[5].length);
+    assertEquals(APPLICATION_ID,
+        AppIdKeyConverter.getInstance().decode(splits[4]));
+    verifyRowPrefixBytes(byteRowKeyPrefix);
+  }
+
+  @Test
+  public void testAppToFlowRowKeyConverter() {
+    byte[] byteRowKey = AppToFlowRowKeyConverter.getInstance().encode(
+        new AppToFlowRowKey(CLUSTER, APPLICATION_ID));
+    AppToFlowRowKey rowKey =
+        AppToFlowRowKeyConverter.getInstance().decode(byteRowKey);
+    assertEquals(CLUSTER, rowKey.getClusterId());
+    assertEquals(APPLICATION_ID, rowKey.getAppId());
+  }
+
+  @Test
+  public void testAppIdKeyConverter() {
+    long currentTs = System.currentTimeMillis();
+    ApplicationId appId1 = ApplicationId.newInstance(currentTs, 1);
+    ApplicationId appId2 = ApplicationId.newInstance(currentTs, 2);
+    ApplicationId appId3 = ApplicationId.newInstance(currentTs + 300, 1);
+    String appIdStr1 = appId1.toString();
+    String appIdStr2 = appId2.toString();
+    String appIdStr3 = appId3.toString();
+    byte[] appIdBytes1 = AppIdKeyConverter.getInstance().encode(appIdStr1);
+    byte[] appIdBytes2 = AppIdKeyConverter.getInstance().encode(appIdStr2);
+    byte[] appIdBytes3 = AppIdKeyConverter.getInstance().encode(appIdStr3);
+    // App ids' should be encoded in a manner wherein descending order
+    // is maintained.
+    assertTrue("Ordering of app ids' is incorrect",
+        Bytes.compareTo(appIdBytes1, appIdBytes2) > 0 &&
+        Bytes.compareTo(appIdBytes1, appIdBytes3) > 0 &&
+        Bytes.compareTo(appIdBytes2, appIdBytes3) > 0);
+    String decodedAppId1 = AppIdKeyConverter.getInstance().decode(appIdBytes1);
+    String decodedAppId2 = AppIdKeyConverter.getInstance().decode(appIdBytes2);
+    String decodedAppId3 = AppIdKeyConverter.getInstance().decode(appIdBytes3);
+    assertTrue("Decoded app id is not same as the app id encoded",
+        appIdStr1.equals(decodedAppId1));
+    assertTrue("Decoded app id is not same as the app id encoded",
+        appIdStr2.equals(decodedAppId2));
+    assertTrue("Decoded app id is not same as the app id encoded",
+        appIdStr3.equals(decodedAppId3));
+  }
+
+  @Test
+  public void testEventColumnNameConverter() {
+    String eventId = "=foo_=eve=nt=";
+    byte[] valSepBytes = Bytes.toBytes(Separator.VALUES.getValue());
+    byte[] maxByteArr =
+        Bytes.createMaxByteArray(Bytes.SIZEOF_LONG - valSepBytes.length);
+    byte[] ts = Bytes.add(valSepBytes, maxByteArr);
+    Long eventTs = Bytes.toLong(ts);
+    byte[] byteEventColName = EventColumnNameConverter.getInstance().encode(
+        new EventColumnName(eventId, eventTs, null));
+    EventColumnName eventColName =
+        EventColumnNameConverter.getInstance().decode(byteEventColName);
+    assertEquals(eventId, eventColName.getId());
+    assertEquals(eventTs, eventColName.getTimestamp());
+    assertNull(eventColName.getInfoKey());
+
+    String infoKey = "f=oo_event_in=fo=_key";
+    byteEventColName = EventColumnNameConverter.getInstance().encode(
+        new EventColumnName(eventId, eventTs, infoKey));
+    eventColName =
+        EventColumnNameConverter.getInstance().decode(byteEventColName);
+    assertEquals(eventId, eventColName.getId());
+    assertEquals(eventTs, eventColName.getTimestamp());
+    assertEquals(infoKey, eventColName.getInfoKey());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java
index 8b25a83..0cda97c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestSeparator.java
@@ -19,11 +19,13 @@ package 
org.apache.hadoop.yarn.server.timelineservice.storage.common;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
 
 import com.google.common.collect.Iterables;
@@ -32,7 +34,7 @@ public class TestSeparator {
 
   private static String villain = "Dr. Heinz Doofenshmirtz";
   private static String special =
-      ".   *   |   ?   +   (   )   [   ]   {   }   ^   $  \\ \"";
+      ".   *   |   ?   +   \t   (   )   [   ]   {   }   ^   $  \\ \"";
 
   /**
    *
@@ -47,6 +49,7 @@ public class TestSeparator {
       testEncodeDecode(separator, "?");
       testEncodeDecode(separator, "&");
       testEncodeDecode(separator, "+");
+      testEncodeDecode(separator, "\t");
       testEncodeDecode(separator, "Dr.");
       testEncodeDecode(separator, "Heinz");
       testEncodeDecode(separator, "Doofenshmirtz");
@@ -79,6 +82,83 @@ public class TestSeparator {
 
   }
 
+  @Test
+  public void testSplits() {
+    byte[] maxLongBytes = Bytes.toBytes(Long.MAX_VALUE);
+    byte[] maxIntBytes = Bytes.toBytes(Integer.MAX_VALUE);
+    for (Separator separator : Separator.values()) {
+      String str1 = "cl" + separator.getValue() + "us";
+      String str2 = separator.getValue() + "rst";
+      byte[] sepByteArr = Bytes.toBytes(separator.getValue());
+      byte[] longVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxLongBytes,
+          sepByteArr.length, Bytes.SIZEOF_LONG - sepByteArr.length));
+      byte[] intVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxIntBytes,
+          sepByteArr.length, Bytes.SIZEOF_INT - sepByteArr.length));
+      byte[] arr = separator.join(
+          Bytes.toBytes(separator.encode(str1)),longVal1Arr,
+          Bytes.toBytes(separator.encode(str2)), intVal1Arr);
+      int[] sizes = { Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+          Separator.VARIABLE_SIZE, Bytes.SIZEOF_INT };
+      byte[][] splits = separator.split(arr, sizes);
+      assertEquals(4, splits.length);
+      assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
+      assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1]));
+      assertEquals(str2, separator.decode(Bytes.toString(splits[2])));
+      assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3]));
+
+      longVal1Arr = Bytes.add(Bytes.copy(maxLongBytes, 0, Bytes.SIZEOF_LONG -
+          sepByteArr.length), sepByteArr);
+      intVal1Arr = Bytes.add(Bytes.copy(maxIntBytes, 0, Bytes.SIZEOF_INT -
+          sepByteArr.length), sepByteArr);
+      arr = separator.join(Bytes.toBytes(separator.encode(str1)),longVal1Arr,
+          Bytes.toBytes(separator.encode(str2)), intVal1Arr);
+      splits = separator.split(arr, sizes);
+      assertEquals(4, splits.length);
+      assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
+      assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1]));
+      assertEquals(str2, separator.decode(Bytes.toString(splits[2])));
+      assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3]));
+
+      longVal1Arr = Bytes.add(sepByteArr, Bytes.copy(maxLongBytes,
+          sepByteArr.length, 4 - sepByteArr.length), sepByteArr);
+      longVal1Arr = Bytes.add(longVal1Arr, Bytes.copy(maxLongBytes, 4, 3 -
+              sepByteArr.length), sepByteArr);
+      arr = separator.join(Bytes.toBytes(separator.encode(str1)),longVal1Arr,
+          Bytes.toBytes(separator.encode(str2)), intVal1Arr);
+      splits = separator.split(arr, sizes);
+      assertEquals(4, splits.length);
+      assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
+      assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[1]));
+      assertEquals(str2, separator.decode(Bytes.toString(splits[2])));
+      assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[3]));
+
+      arr = separator.join(Bytes.toBytes(separator.encode(str1)),
+          Bytes.toBytes(separator.encode(str2)), intVal1Arr, longVal1Arr);
+      int[] sizes1 = { Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+          Bytes.SIZEOF_INT, Bytes.SIZEOF_LONG };
+      splits = separator.split(arr, sizes1);
+      assertEquals(4, splits.length);
+      assertEquals(str1, separator.decode(Bytes.toString(splits[0])));
+      assertEquals(str2, separator.decode(Bytes.toString(splits[1])));
+      assertEquals(Bytes.toInt(intVal1Arr), Bytes.toInt(splits[2]));
+      assertEquals(Bytes.toLong(longVal1Arr), Bytes.toLong(splits[3]));
+
+      try {
+        int[] sizes2 = { Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
+            Bytes.SIZEOF_INT, 7 };
+        splits = separator.split(arr, sizes2);
+        fail("Exception should have been thrown.");
+      } catch (IllegalArgumentException e) {}
+
+      try {
+        int[] sizes2 = { Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, 2,
+            Bytes.SIZEOF_LONG };
+        splits = separator.split(arr, sizes2);
+        fail("Exception should have been thrown.");
+      } catch (IllegalArgumentException e) {}
+    }
+  }
+
   /**
    * Simple test to encode and decode using the same separators and confirm 
that
    * we end up with the same as what we started with.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2229377/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineStorageUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineStorageUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineStorageUtils.java
deleted file mode 100644
index 046eda1..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestTimelineStorageUtils.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.common;
-
-import static org.junit.Assert.assertTrue;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.junit.Test;
-
-public class TestTimelineStorageUtils {
-
-  @Test
-  public void testEncodeDecodeAppId() {
-    long currentTs = System.currentTimeMillis();
-    ApplicationId appId1 = ApplicationId.newInstance(currentTs, 1);
-    ApplicationId appId2 = ApplicationId.newInstance(currentTs, 2);
-    ApplicationId appId3 = ApplicationId.newInstance(currentTs + 300, 1);
-    String appIdStr1 = appId1.toString();
-    String appIdStr2 = appId2.toString();
-    String appIdStr3 = appId3.toString();
-    byte[] appIdBytes1 = TimelineStorageUtils.encodeAppId(appIdStr1);
-    byte[] appIdBytes2 = TimelineStorageUtils.encodeAppId(appIdStr2);
-    byte[] appIdBytes3 = TimelineStorageUtils.encodeAppId(appIdStr3);
-    // App ids' should be encoded in a manner wherein descending order
-    // is maintained.
-    assertTrue("Ordering of app ids' is incorrect",
-        Bytes.compareTo(appIdBytes1, appIdBytes2) > 0 &&
-        Bytes.compareTo(appIdBytes1, appIdBytes3) > 0 &&
-        Bytes.compareTo(appIdBytes2, appIdBytes3) > 0);
-    String decodedAppId1 = TimelineStorageUtils.decodeAppId(appIdBytes1);
-    String decodedAppId2 = TimelineStorageUtils.decodeAppId(appIdBytes2);
-    String decodedAppId3 = TimelineStorageUtils.decodeAppId(appIdBytes3);
-    assertTrue("Decoded app id is not same as the app id encoded",
-        appIdStr1.equals(decodedAppId1));
-    assertTrue("Decoded app id is not same as the app id encoded",
-        appIdStr2.equals(decodedAppId2));
-    assertTrue("Decoded app id is not same as the app id encoded",
-        appIdStr3.equals(decodedAppId3));
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to