YARN-3706. Generalize native HBase writer for additional tables (Joep 
Rottinghuis via sjlee)

(cherry picked from commit 9137aeae0dec83f9eff40d12cae712dfd508c0c5)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/42ed0625
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/42ed0625
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/42ed0625

Branch: refs/heads/YARN-2928
Commit: 42ed0625da34ef8d4794140289c102920259eaf0
Parents: 623b3ab
Author: Sangjin Lee <sj...@apache.org>
Authored: Thu Jun 18 10:49:20 2015 -0700
Committer: Sangjin Lee <sj...@apache.org>
Committed: Tue Aug 25 10:47:13 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../storage/EntityColumnDetails.java            | 110 ------
 .../storage/EntityColumnFamily.java             |  95 -----
 .../storage/HBaseTimelineWriterImpl.java        | 114 +++---
 .../server/timelineservice/storage/Range.java   |  59 ----
 .../storage/TimelineEntitySchemaConstants.java  |  71 ----
 .../storage/TimelineSchemaCreator.java          | 134 +-------
 .../timelineservice/storage/TimelineWriter.java |   3 +-
 .../storage/TimelineWriterUtils.java            | 344 -------------------
 .../storage/common/BaseTable.java               | 118 +++++++
 .../common/BufferedMutatorDelegator.java        |  73 ++++
 .../timelineservice/storage/common/Column.java  |  59 ++++
 .../storage/common/ColumnFamily.java            |  34 ++
 .../storage/common/ColumnHelper.java            | 247 +++++++++++++
 .../storage/common/ColumnPrefix.java            |  83 +++++
 .../timelineservice/storage/common/Range.java   |  59 ++++
 .../storage/common/Separator.java               | 303 ++++++++++++++++
 .../common/TimelineEntitySchemaConstants.java   |  68 ++++
 .../storage/common/TimelineWriterUtils.java     | 127 +++++++
 .../storage/common/TypedBufferedMutator.java    |  28 ++
 .../storage/common/package-info.java            |  24 ++
 .../storage/entity/EntityColumn.java            | 141 ++++++++
 .../storage/entity/EntityColumnFamily.java      |  65 ++++
 .../storage/entity/EntityColumnPrefix.java      | 212 ++++++++++++
 .../storage/entity/EntityRowKey.java            |  93 +++++
 .../storage/entity/EntityTable.java             | 161 +++++++++
 .../storage/entity/package-info.java            |  25 ++
 .../storage/TestHBaseTimelineWriterImpl.java    | 252 ++++++++------
 .../storage/common/TestSeparator.java           | 129 +++++++
 .../storage/common/TestTimelineWriterUtils.java |  29 ++
 30 files changed, 2301 insertions(+), 962 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ed0625/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 1cf1f26..1e85cdf 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -93,6 +93,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via
     zjshen)
 
+    YARN-3706. Generalize native HBase writer for additional tables (Joep
+    Rottinghuis via sjlee)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ed0625/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java
deleted file mode 100644
index 2894c41..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * Contains the Info Column Family details like Column names, types and byte
- * representations for
- * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
- * object that is stored in hbase Also has utility functions for storing each 
of
- * these to the backend
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-enum EntityColumnDetails {
-  ID(EntityColumnFamily.INFO, "id"),
-  TYPE(EntityColumnFamily.INFO, "type"),
-  CREATED_TIME(EntityColumnFamily.INFO, "created_time"),
-  MODIFIED_TIME(EntityColumnFamily.INFO, "modified_time"),
-  FLOW_VERSION(EntityColumnFamily.INFO, "flow_version"),
-  PREFIX_IS_RELATED_TO(EntityColumnFamily.INFO, "r"),
-  PREFIX_RELATES_TO(EntityColumnFamily.INFO, "s"),
-  PREFIX_EVENTS(EntityColumnFamily.INFO, "e");
-
-  private final EntityColumnFamily columnFamily;
-  private final String value;
-  private final byte[] inBytes;
-
-  private EntityColumnDetails(EntityColumnFamily columnFamily, 
-      String value) {
-    this.columnFamily = columnFamily;
-    this.value = value;
-    this.inBytes = Bytes.toBytes(this.value.toLowerCase());
-  }
-
-  public String getValue() {
-    return value;
-  }
-
-  byte[] getInBytes() {
-    return inBytes;
-  }
-
-  void store(byte[] rowKey, BufferedMutator entityTable, Object inputValue)
-      throws IOException {
-    TimelineWriterUtils.store(rowKey, entityTable,
-        this.columnFamily.getInBytes(), null, this.getInBytes(), inputValue,
-        null);
-  }
-
-  /**
-   * stores events data with column prefix
-   */
-  void store(byte[] rowKey, BufferedMutator entityTable, byte[] idBytes,
-      String key, Object inputValue) throws IOException {
-    TimelineWriterUtils.store(rowKey, entityTable,
-        this.columnFamily.getInBytes(),
-        // column prefix
-        TimelineWriterUtils.join(
-            TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES,
-            this.getInBytes(), idBytes),
-        // column qualifier
-        Bytes.toBytes(key),
-        inputValue, null);
-  }
-
-  /**
-   * stores relation entities with a column prefix
-   */
-  void store(byte[] rowKey, BufferedMutator entityTable, String key,
-      Set<String> inputValue) throws IOException {
-    TimelineWriterUtils.store(rowKey, entityTable,
-        this.columnFamily.getInBytes(),
-        // column prefix
-        this.getInBytes(),
-        // column qualifier
-        Bytes.toBytes(key),
-        // value
-        TimelineWriterUtils.getValueAsString(
-            TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR, inputValue),
-        // cell timestamp
-        null);
-  }
-
-  // TODO add a method that accepts a byte array,
-  // iterates over the enum and returns an enum from those bytes
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ed0625/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java
deleted file mode 100644
index e556351..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamily.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * Contains the Column family names and byte representations for
- * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
- * object that is stored in hbase
- * Also has utility functions for storing each of these to the backend
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-enum EntityColumnFamily {
-  INFO("i"),
-  CONFIG("c"),
-  METRICS("m");
-
-  private final String value;
-  private final byte[] inBytes;
-
-  private EntityColumnFamily(String value) {
-    this.value = value;
-    this.inBytes = Bytes.toBytes(this.value.toLowerCase());
-  }
-
-  byte[] getInBytes() {
-    return inBytes;
-  }
-
-  public String getValue() {
-    return value;
-  }
-
-  /**
-   * stores the key as column and value as hbase column value in the given
-   * column family in the entity table
-   *
-   * @param rowKey
-   * @param entityTable
-   * @param inputValue
-   * @throws IOException
-   */
-  public void store(byte[] rowKey, BufferedMutator entityTable, String key,
-      String inputValue) throws IOException {
-    if (key == null) {
-      return;
-    }
-    TimelineWriterUtils.store(rowKey, entityTable, this.getInBytes(), null,
-        Bytes.toBytes(key), inputValue, null);
-  }
-
-  /**
-   * stores the values along with cell timestamp
-   *
-   * @param rowKey
-   * @param entityTable
-   * @param key
-   * @param timestamp
-   * @param inputValue
-   * @throws IOException
-   */
-  public void store(byte[] rowKey, BufferedMutator entityTable, String key,
-      Long timestamp, Number inputValue) throws IOException {
-    if (key == null) {
-      return;
-    }
-    TimelineWriterUtils.store(rowKey, entityTable, this.getInBytes(), null,
-        Bytes.toBytes(key), inputValue, timestamp);
-  }
-
-  // TODO add a method that accepts a byte array,
-  // iterates over the enum and returns an enum from those bytes
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ed0625/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index aa71c6c..e48ca60 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -26,19 +26,22 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.util.Bytes;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.TimelineEntitySchemaConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
 
 /**
  * This implements a hbase based backend for storing application timeline 
entity
@@ -50,7 +53,7 @@ public class HBaseTimelineWriterImpl extends AbstractService 
implements
     TimelineWriter {
 
   private Connection conn;
-  private BufferedMutator entityTable;
+  private TypedBufferedMutator<EntityTable> entityTable;
 
   private static final Log LOG = LogFactory
       .getLog(HBaseTimelineWriterImpl.class);
@@ -72,10 +75,7 @@ public class HBaseTimelineWriterImpl extends AbstractService 
implements
     super.serviceInit(conf);
     Configuration hbaseConf = HBaseConfiguration.create(conf);
     conn = ConnectionFactory.createConnection(hbaseConf);
-    TableName entityTableName = TableName.valueOf(hbaseConf.get(
-        TimelineEntitySchemaConstants.ENTITY_TABLE_NAME,
-        TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME));
-    entityTable = conn.getBufferedMutator(entityTableName);
+    entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
   }
 
   /**
@@ -86,9 +86,6 @@ public class HBaseTimelineWriterImpl extends AbstractService 
implements
       String flowName, String flowVersion, long flowRunId, String appId,
       TimelineEntities data) throws IOException {
 
-    byte[] rowKeyPrefix = TimelineWriterUtils.getRowKeyPrefix(clusterId,
-        userId, flowName, flowRunId, appId);
-
     TimelineWriteResponse putStatus = new TimelineWriteResponse();
     for (TimelineEntity te : data.getEntities()) {
 
@@ -96,19 +93,19 @@ public class HBaseTimelineWriterImpl extends 
AbstractService implements
       if (te == null) {
         continue;
       }
-      // get row key
-      byte[] row = TimelineWriterUtils.join(
-          TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, rowKeyPrefix,
-          Bytes.toBytes(te.getType()), Bytes.toBytes(te.getId()));
-
-      storeInfo(row, te, flowVersion);
-      storeEvents(row, te.getEvents());
-      storeConfig(row, te.getConfigs());
-      storeMetrics(row, te.getMetrics());
-      storeRelations(row, te.getIsRelatedToEntities(),
-          EntityColumnDetails.PREFIX_IS_RELATED_TO);
-      storeRelations(row, te.getRelatesToEntities(),
-          EntityColumnDetails.PREFIX_RELATES_TO);
+
+      byte[] rowKey =
+          EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId,
+              te);
+
+      storeInfo(rowKey, te, flowVersion);
+      storeEvents(rowKey, te.getEvents());
+      storeConfig(rowKey, te.getConfigs());
+      storeMetrics(rowKey, te.getMetrics());
+      storeRelations(rowKey, te.getIsRelatedToEntities(),
+          EntityColumnPrefix.IS_RELATED_TO);
+      storeRelations(rowKey, te.getRelatesToEntities(),
+          EntityColumnPrefix.RELATES_TO);
     }
 
     return putStatus;
@@ -119,10 +116,15 @@ public class HBaseTimelineWriterImpl extends 
AbstractService implements
    */
   private void storeRelations(byte[] rowKey,
       Map<String, Set<String>> connectedEntities,
-      EntityColumnDetails columnNamePrefix) throws IOException {
-    for (Map.Entry<String, Set<String>> entry : connectedEntities.entrySet()) {
-      columnNamePrefix.store(rowKey, entityTable, entry.getKey(),
-          entry.getValue());
+      EntityColumnPrefix entityColumnPrefix) throws IOException {
+    for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities
+        .entrySet()) {
+      // id3?id4?id5
+      String compoundValue =
+          Separator.VALUES.joinEncoded(connectedEntity.getValue());
+
+      entityColumnPrefix.store(rowKey, entityTable, connectedEntity.getKey(),
+          null, compoundValue);
     }
   }
 
@@ -132,13 +134,13 @@ public class HBaseTimelineWriterImpl extends 
AbstractService implements
   private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion)
       throws IOException {
 
-    EntityColumnDetails.ID.store(rowKey, entityTable, te.getId());
-    EntityColumnDetails.TYPE.store(rowKey, entityTable, te.getType());
-    EntityColumnDetails.CREATED_TIME.store(rowKey, entityTable,
+    EntityColumn.ID.store(rowKey, entityTable, null, te.getId());
+    EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType());
+    EntityColumn.CREATED_TIME.store(rowKey, entityTable, null,
         te.getCreatedTime());
-    EntityColumnDetails.MODIFIED_TIME.store(rowKey, entityTable,
+    EntityColumn.MODIFIED_TIME.store(rowKey, entityTable, null,
         te.getModifiedTime());
-    EntityColumnDetails.FLOW_VERSION.store(rowKey, entityTable, flowVersion);
+    EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion);
   }
 
   /**
@@ -150,8 +152,8 @@ public class HBaseTimelineWriterImpl extends 
AbstractService implements
       return;
     }
     for (Map.Entry<String, String> entry : config.entrySet()) {
-      EntityColumnFamily.CONFIG.store(rowKey, entityTable,
-          entry.getKey(), entry.getValue());
+      EntityColumnPrefix.CONFIG.store(rowKey, entityTable, entry.getKey(),
+          null, entry.getValue());
     }
   }
 
@@ -163,11 +165,12 @@ public class HBaseTimelineWriterImpl extends 
AbstractService implements
       throws IOException {
     if (metrics != null) {
       for (TimelineMetric metric : metrics) {
-        String key = metric.getId();
+        String metricColumnQualifier = metric.getId();
         Map<Long, Number> timeseries = metric.getValues();
-        for (Map.Entry<Long, Number> entry : timeseries.entrySet()) {
-          EntityColumnFamily.METRICS.store(rowKey, entityTable, key,
-              entry.getKey(), entry.getValue());
+        for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
+          Long timestamp = timeseriesEntry.getKey();
+          EntityColumnPrefix.METRIC.store(rowKey, entityTable,
+              metricColumnQualifier, timestamp, timeseriesEntry.getValue());
         }
       }
     }
@@ -181,19 +184,27 @@ public class HBaseTimelineWriterImpl extends 
AbstractService implements
     if (events != null) {
       for (TimelineEvent event : events) {
         if (event != null) {
-          String id = event.getId();
-          if (id != null) {
-            byte[] idBytes = Bytes.toBytes(id);
+          String eventId = event.getId();
+          if (eventId != null) {
             Map<String, Object> eventInfo = event.getInfo();
             if (eventInfo != null) {
               for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
-                EntityColumnDetails.PREFIX_EVENTS.store(rowKey,
-                    entityTable, idBytes, info.getKey(), info.getValue());
-              }
+                // eventId?infoKey
+                byte[] columnQualifierFirst =
+                    Bytes.toBytes(Separator.VALUES.encode(eventId));
+                byte[] compoundColumnQualifierBytes =
+                    Separator.VALUES.join(columnQualifierFirst,
+                        Bytes.toBytes(info.getKey()));
+                // convert back to string to avoid additional API on store.
+                String compoundColumnQualifier =
+                    Bytes.toString(compoundColumnQualifierBytes);
+                EntityColumnPrefix.METRIC.store(rowKey, entityTable,
+                    compoundColumnQualifier, null, info.getValue());
+              } // for info: eventInfo
             }
           }
         }
-      }
+      } // event : events
     }
   }
 
@@ -204,8 +215,7 @@ public class HBaseTimelineWriterImpl extends 
AbstractService implements
   }
 
   /**
-   * close the hbase connections
-   * The close APIs perform flushing and release any
+   * close the hbase connections The close APIs perform flushing and release 
any
    * resources held
    */
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ed0625/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java
deleted file mode 100644
index 2a2db81..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class Range {
-  private final int startIdx;
-  private final int endIdx;
-
-  /**
-   * Defines a range from start index (inclusive) to end index (exclusive).
-   *
-   * @param start
-   *          Starting index position
-   * @param end
-   *          Ending index position (exclusive)
-   */
-  public Range(int start, int end) {
-    if (start < 0 || end < start) {
-      throw new IllegalArgumentException(
-          "Invalid range, required that: 0 <= start <= end; start=" + start
-              + ", end=" + end);
-    }
-
-    this.startIdx = start;
-    this.endIdx = end;
-  }
-
-  public int start() {
-    return startIdx;
-  }
-
-  public int end() {
-    return endIdx;
-  }
-
-  public int length() {
-    return endIdx - startIdx;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ed0625/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java
deleted file mode 100644
index d95cbb2..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-/**
- * contains the constants used in the context of schema accesses for
- * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
- * information
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class TimelineEntitySchemaConstants {
-
-  /** entity prefix */
-  public static final String ENTITY_PREFIX =
-      YarnConfiguration.TIMELINE_SERVICE_PREFIX
-      + ".entity";
-
-  /** config param name that specifies the entity table name */
-  public static final String ENTITY_TABLE_NAME = ENTITY_PREFIX
-      + ".table.name";
-
-  /**
-   * config param name that specifies the TTL for metrics column family in
-   * entity table
-   */
-  public static final String ENTITY_TABLE_METRICS_TTL = ENTITY_PREFIX
-      + ".table.metrics.ttl";
-
-  /** default value for entity table name */
-  public static final String DEFAULT_ENTITY_TABLE_NAME = 
"timelineservice.entity";
-
-  /** in bytes default value for entity table name */
-  static final byte[] DEFAULT_ENTITY_TABLE_NAME_BYTES = Bytes
-      .toBytes(DEFAULT_ENTITY_TABLE_NAME);
-
-  /** separator in row key */
-  public static final String ROW_KEY_SEPARATOR = "!";
-
-  /** byte representation of the separator in row key */
-  static final byte[] ROW_KEY_SEPARATOR_BYTES = Bytes
-      .toBytes(ROW_KEY_SEPARATOR);
-
-  public static final byte ZERO_BYTES = 0;
-
-  /** default TTL is 30 days for metrics timeseries */
-  public static final int ENTITY_TABLE_METRICS_TTL_DEFAULT = 2592000;
-
-  /** default max number of versions */
-  public static final int ENTITY_TABLE_METRICS_MAX_VERSIONS_DEFAULT = 1000;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ed0625/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
index 820a6d1..a5cc2ab 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
@@ -19,21 +19,6 @@ package 
org.apache.hadoop.yarn.server.timelineservice.storage;
 
 import java.io.IOException;
 
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -41,7 +26,18 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.util.GenericOptionsParser;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
 
 /**
  * This creates the schema for a hbase based backend for storing application
@@ -53,18 +49,6 @@ public class TimelineSchemaCreator {
 
   final static String NAME = TimelineSchemaCreator.class.getSimpleName();
   private static final Log LOG = 
LogFactory.getLog(TimelineSchemaCreator.class);
-  final static byte[][] splits = { Bytes.toBytes("a"), Bytes.toBytes("ad"),
-      Bytes.toBytes("an"), Bytes.toBytes("b"), Bytes.toBytes("ca"),
-      Bytes.toBytes("cl"), Bytes.toBytes("d"), Bytes.toBytes("e"),
-      Bytes.toBytes("f"), Bytes.toBytes("g"), Bytes.toBytes("h"),
-      Bytes.toBytes("i"), Bytes.toBytes("j"), Bytes.toBytes("k"),
-      Bytes.toBytes("l"), Bytes.toBytes("m"), Bytes.toBytes("n"),
-      Bytes.toBytes("o"), Bytes.toBytes("q"), Bytes.toBytes("r"),
-      Bytes.toBytes("s"), Bytes.toBytes("se"), Bytes.toBytes("t"),
-      Bytes.toBytes("u"), Bytes.toBytes("v"), Bytes.toBytes("w"),
-      Bytes.toBytes("x"), Bytes.toBytes("y"), Bytes.toBytes("z") };
-
-  public static final String SPLIT_KEY_PREFIX_LENGTH = "4";
 
   public static void main(String[] args) throws Exception {
 
@@ -79,13 +63,12 @@ public class TimelineSchemaCreator {
     // Grab the entityTableName argument
     String entityTableName = commandLine.getOptionValue("e");
     if (StringUtils.isNotBlank(entityTableName)) {
-      hbaseConf.set(TimelineEntitySchemaConstants.ENTITY_TABLE_NAME,
-          entityTableName);
+      hbaseConf.set(EntityTable.TABLE_NAME_CONF_NAME, entityTableName);
     }
-    String entityTable_TTL_Metrics = commandLine.getOptionValue("m");
-    if (StringUtils.isNotBlank(entityTable_TTL_Metrics)) {
-      hbaseConf.set(TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL,
-          entityTable_TTL_Metrics);
+    String entityTableTTLMetrics = commandLine.getOptionValue("m");
+    if (StringUtils.isNotBlank(entityTableTTLMetrics)) {
+      int metricsTTL = Integer.parseInt(entityTableTTLMetrics);
+      new EntityTable().setMetricsTTL(metricsTTL, hbaseConf);
     }
     createAllTables(hbaseConf);
   }
@@ -136,7 +119,7 @@ public class TimelineSchemaCreator {
       if (admin == null) {
         throw new IOException("Cannot create table since admin is null");
       }
-      createTimelineEntityTable(admin, hbaseConf);
+      new EntityTable().createTable(admin, hbaseConf);
     } finally {
       if (conn != null) {
         conn.close();
@@ -144,88 +127,5 @@ public class TimelineSchemaCreator {
     }
   }
 
-  /**
-   * Creates a table with column families info, config and metrics
-   * info stores information about a timeline entity object
-   * config stores configuration data of a timeline entity object
-   * metrics stores the metrics of a timeline entity object
-   *
-   * Example entity table record:
-   * <pre>
-   *|------------------------------------------------------------|
-   *|  Row       | Column Family  | Column Family | Column Family|
-   *|  key       | info           | metrics       | config       |
-   *|------------------------------------------------------------|
-   *| userName!  | id:entityId    | metricName1:  | configKey1:  |
-   *| clusterId! |                | metricValue1  | configValue1 |
-   *| flowId!    | type:entityType| @timestamp1   |              |
-   *| flowRunId! |                |               | configKey2:  |
-   *| AppId!     | created_time:  | metricName1:  | configValue2 |
-   *| entityType!| 1392993084018  | metricValue2  |              |
-   *| entityId   |                | @timestamp2   |              |
-   *|            | modified_time: |               |              |
-   *|            | 1392995081012  | metricName2:  |              |
-   *|            |                | metricValue1  |              |
-   *|            | r!relatesToKey:| @timestamp2   |              |
-   *|            | id3!id4!id5    |               |              |
-   *|            |                |               |              |
-   *|            | s!isRelatedToKey|              |              |
-   *|            | id7!id9!id5    |               |              |
-   *|            |                |               |              |
-   *|            | e!eventKey:    |               |              |
-   *|            | eventValue     |               |              |
-   *|            |                |               |              |
-   *|            | flowVersion:   |               |              |
-   *|            | versionValue   |               |              |
-   *|------------------------------------------------------------|
-   *</pre>
-   * @param admin
-   * @param hbaseConf
-   * @throws IOException
-   */
-  public static void createTimelineEntityTable(Admin admin,
-      Configuration hbaseConf) throws IOException {
-
-    TableName table = TableName.valueOf(hbaseConf.get(
-        TimelineEntitySchemaConstants.ENTITY_TABLE_NAME,
-        TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME));
-    if (admin.tableExists(table)) {
-      // do not disable / delete existing table
-      // similar to the approach taken by map-reduce jobs when
-      // output directory exists
-      throw new IOException("Table " + table.getNameAsString()
-          + " already exists.");
-    }
-
-    HTableDescriptor entityTableDescp = new HTableDescriptor(table);
-    HColumnDescriptor cf1 = new HColumnDescriptor(
-        EntityColumnFamily.INFO.getInBytes());
-    cf1.setBloomFilterType(BloomType.ROWCOL);
-    entityTableDescp.addFamily(cf1);
-
-    HColumnDescriptor cf2 = new HColumnDescriptor(
-        EntityColumnFamily.CONFIG.getInBytes());
-    cf2.setBloomFilterType(BloomType.ROWCOL);
-    cf2.setBlockCacheEnabled(true);
-    entityTableDescp.addFamily(cf2);
-
-    HColumnDescriptor cf3 = new HColumnDescriptor(
-        EntityColumnFamily.METRICS.getInBytes());
-    entityTableDescp.addFamily(cf3);
-    cf3.setBlockCacheEnabled(true);
-    // always keep 1 version (the latest)
-    cf3.setMinVersions(1);
-    
cf3.setMaxVersions(TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_MAX_VERSIONS_DEFAULT);
-    cf3.setTimeToLive(hbaseConf.getInt(
-        TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL,
-        TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL_DEFAULT));
-    entityTableDescp
-        
.setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
-    entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
-        SPLIT_KEY_PREFIX_LENGTH);
-    admin.createTable(entityTableDescp, splits);
-    LOG.info("Status of table creation for " + table.getNameAsString() + "="
-        + admin.tableExists(table));
 
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ed0625/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
index 467bcec..494e8ad 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
@@ -15,17 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.yarn.server.timelineservice.storage;
 
 import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.service.Service;
 
 /**
  * This interface is for storing application timeline information.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ed0625/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java
deleted file mode 100644
index 113935e..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.Range;
-
-/**
- * bunch of utility functions used across TimelineWriter classes
- */
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class TimelineWriterUtils {
-
-  /** empty bytes */
-  public static final byte[] EMPTY_BYTES = new byte[0];
-  private static final String SPACE = " ";
-  private static final String UNDERSCORE = "_";
-  private static final String EMPTY_STRING = "";
-
-  /**
-   * Returns a single byte array containing all of the individual component
-   * arrays separated by the separator array.
-   *
-   * @param separator
-   * @param components
-   * @return byte array after joining the components
-   */
-  public static byte[] join(byte[] separator, byte[]... components) {
-    if (components == null || components.length == 0) {
-      return EMPTY_BYTES;
-    }
-
-    int finalSize = 0;
-    if (separator != null) {
-      finalSize = separator.length * (components.length - 1);
-    }
-    for (byte[] comp : components) {
-      if (comp != null) {
-        finalSize += comp.length;
-      }
-    }
-
-    byte[] buf = new byte[finalSize];
-    int offset = 0;
-    for (int i = 0; i < components.length; i++) {
-      if (components[i] != null) {
-        System.arraycopy(components[i], 0, buf, offset, components[i].length);
-        offset += components[i].length;
-        if (i < (components.length - 1) && separator != null
-            && separator.length > 0) {
-          System.arraycopy(separator, 0, buf, offset, separator.length);
-          offset += separator.length;
-        }
-      }
-    }
-    return buf;
-  }
-
-  /**
-   * Splits the source array into multiple array segments using the given
-   * separator, up to a maximum of count items. This will naturally produce
-   * copied byte arrays for each of the split segments. To identify the split
-   * ranges without the array copies, see
-   * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
-   *
-   * @param source
-   * @param separator
-   * @return byte[] array after splitting the source
-   */
-  public static byte[][] split(byte[] source, byte[] separator) {
-    return split(source, separator, -1);
-  }
-
-  /**
-   * Splits the source array into multiple array segments using the given
-   * separator, up to a maximum of count items. This will naturally produce
-   * copied byte arrays for each of the split segments. To identify the split
-   * ranges without the array copies, see
-   * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
-   *
-   * @param source
-   * @param separator
-   * @param limit
-   * @return byte[][] after splitting the input source
-   */
-  public static byte[][] split(byte[] source, byte[] separator, int limit) {
-    List<Range> segments = splitRanges(source, separator, limit);
-
-    byte[][] splits = new byte[segments.size()][];
-    for (int i = 0; i < segments.size(); i++) {
-      Range r = segments.get(i);
-      byte[] tmp = new byte[r.length()];
-      if (tmp.length > 0) {
-        System.arraycopy(source, r.start(), tmp, 0, r.length());
-      }
-      splits[i] = tmp;
-    }
-    return splits;
-  }
-
-  /**
-   * Returns a list of ranges identifying [start, end) -- closed, open --
-   * positions within the source byte array that would be split using the
-   * separator byte array.
-   */
-  public static List<Range> splitRanges(byte[] source, byte[] separator) {
-    return splitRanges(source, separator, -1);
-  }
-
-  /**
-   * Returns a list of ranges identifying [start, end) -- closed, open --
-   * positions within the source byte array that would be split using the
-   * separator byte array.
-   * @param source the source data
-   * @param separator the separator pattern to look for
-   * @param limit the maximum number of splits to identify in the source
-   */
-  public static List<Range> splitRanges(byte[] source, byte[] separator, int 
limit) {
-    List<Range> segments = new ArrayList<Range>();
-    if ((source == null) || (separator == null)) {
-      return segments;
-    }
-    int start = 0;
-    itersource: for (int i = 0; i < source.length; i++) {
-      for (int j = 0; j < separator.length; j++) {
-        if (source[i + j] != separator[j]) {
-          continue itersource;
-        }
-      }
-      // all separator elements matched
-      if (limit > 0 && segments.size() >= (limit-1)) {
-        // everything else goes in one final segment
-        break;
-      }
-
-      segments.add(new Range(start, i));
-      start = i + separator.length;
-      // i will be incremented again in outer for loop
-      i += separator.length-1;
-    }
-    // add in remaining to a final range
-    if (start <= source.length) {
-      segments.add(new Range(start, source.length));
-    }
-    return segments;
-  }
-
-  /**
-   * converts run id into it's inverse timestamp
-   * @param flowRunId
-   * @return inverted long
-   */
-  public static long encodeRunId(Long flowRunId) {
-    return Long.MAX_VALUE - flowRunId;
-  }
-
-  /**
-   * return a value from the Map as a String
-   * @param key
-   * @param values
-   * @return value as a String or ""
-   * @throws IOException 
-   */
-  public static String getValueAsString(final byte[] key,
-      final Map<byte[], byte[]> values) throws IOException {
-    if( values == null ) {
-      return EMPTY_STRING;
-    }
-    byte[] value = values.get(key);
-    if (value != null) {
-      return GenericObjectMapper.read(value).toString();
-    } else {
-      return EMPTY_STRING;
-    }
-  }
-
-  /**
-   * return a value from the Map as a long
-   * @param key
-   * @param values
-   * @return value as Long or 0L
-   * @throws IOException 
-   */
-  public static long getValueAsLong(final byte[] key,
-      final Map<byte[], byte[]> values) throws IOException {
-    if (values == null) {
-      return 0;
-    }
-    byte[] value = values.get(key);
-    if (value != null) {
-      Number val = (Number) GenericObjectMapper.read(value);
-      return val.longValue();
-    } else {
-      return 0L;
-    }
-  }
-
-  /**
-   * concates the values from a Set<Strings> to return a single delimited 
string value
-   * @param rowKeySeparator
-   * @param values
-   * @return Value from the set of strings as a string
-   */
-  public static String getValueAsString(String rowKeySeparator,
-      Set<String> values) {
-
-    if (values == null) {
-      return EMPTY_STRING;
-    }
-    StringBuilder concatStrings = new StringBuilder();
-    for (String value : values) {
-      concatStrings.append(value);
-      concatStrings.append(rowKeySeparator);
-    }
-    // remove the last separator
-    if(concatStrings.length() > 1) {
-      concatStrings.deleteCharAt(concatStrings.lastIndexOf(rowKeySeparator));
-    }
-    return concatStrings.toString();
-  }
-  /**
-   * Constructs a row key prefix for the entity table
-   * @param clusterId
-   * @param userId
-   * @param flowId
-   * @param flowRunId
-   * @param appId
-   * @return byte array with the row key prefix
-   */
-  static byte[] getRowKeyPrefix(String clusterId, String userId, String flowId,
-      Long flowRunId, String appId) {
-    return TimelineWriterUtils.join(
-        TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES,
-        Bytes.toBytes(cleanse(userId)), Bytes.toBytes(cleanse(clusterId)),
-        Bytes.toBytes(cleanse(flowId)),
-        Bytes.toBytes(TimelineWriterUtils.encodeRunId(flowRunId)),
-        Bytes.toBytes(cleanse(appId)));
- }
-
-  /**
-   * Takes a string token to be used as a key or qualifier and
-   * cleanses out reserved tokens.
-   * This operation is not symmetrical.
-   * Logic is to replace all spaces and separator chars in input with
-   * underscores.
-   *
-   * @param token token to cleanse.
-   * @return String with no spaces and no separator chars
-   */
-  public static String cleanse(String token) {
-    if (token == null || token.length() == 0) {
-      return token;
-    }
-
-    String cleansed = token.replaceAll(SPACE, UNDERSCORE);
-    cleansed = cleansed.replaceAll(
-        TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR, UNDERSCORE);
-
-    return cleansed;
-  }
-
-  /**
-   * stores the info to the table in hbase
-   * 
-   * @param rowKey
-   * @param table
-   * @param columnFamily
-   * @param columnPrefix
-   * @param columnQualifier
-   * @param inputValue
-   * @param cellTimeStamp
-   * @throws IOException
-   */
-  public static void store(byte[] rowKey, BufferedMutator table, byte[] 
columnFamily,
-      byte[] columnPrefix, byte[] columnQualifier, Object inputValue,
-      Long cellTimeStamp) throws IOException {
-    if ((rowKey == null) || (table == null) || (columnFamily == null)
-        || (columnQualifier == null) || (inputValue == null)) {
-      return;
-    }
-
-    Put p = null;
-    if (cellTimeStamp == null) {
-      if (columnPrefix != null) {
-        // store with prefix
-        p = new Put(rowKey);
-        p.addColumn(
-            columnFamily,
-            join(TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES,
-                columnPrefix, columnQualifier), GenericObjectMapper
-                .write(inputValue));
-      } else {
-        // store without prefix
-        p = new Put(rowKey);
-        p.addColumn(columnFamily, columnQualifier,
-            GenericObjectMapper.write(inputValue));
-      }
-    } else {
-      // store with cell timestamp
-      Cell cell = CellUtil.createCell(rowKey, columnFamily, columnQualifier,
-          // set the cell timestamp
-          cellTimeStamp,
-          // KeyValue Type minimum
-          TimelineEntitySchemaConstants.ZERO_BYTES,
-          GenericObjectMapper.write(inputValue));
-      p = new Put(rowKey);
-      p.add(cell);
-    }
-    if (p != null) {
-      table.mutate(p);
-    }
-
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ed0625/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
new file mode 100644
index 0000000..e8d8b5c
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+
+/**
+ * Implements behavior common to tables used in the timeline service storage.
+ *
+ * @param <T> reference to the table instance class itself for type safety.
+ */
+public abstract class BaseTable<T> {
+
+  /**
+   * Name of config variable that is used to point to this table
+   */
+  private final String tableNameConfName;
+
+  /**
+   * Unless the configuration overrides, this will be the default name for the
+   * table when it is created.
+   */
+  private final String defaultTableName;
+
+  /**
+   * @param tableNameConfName name of config variable that is used to point to
+   *          this table.
+   */
+  protected BaseTable(String tableNameConfName, String defaultTableName) {
+    this.tableNameConfName = tableNameConfName;
+    this.defaultTableName = defaultTableName;
+  }
+
+  /**
+   * Used to create a type-safe mutator for this table.
+   *
+   * @param hbaseConf used to read table name
+   * @param conn used to create a table from.
+   * @return a type safe {@link BufferedMutator} for the entity table.
+   * @throws IOException
+   */
+  public TypedBufferedMutator<T> getTableMutator(Configuration hbaseConf,
+      Connection conn) throws IOException {
+
+    TableName tableName = this.getTableName(hbaseConf);
+
+    // Plain buffered mutator
+    BufferedMutator bufferedMutator = conn.getBufferedMutator(tableName);
+
+    // Now make this thing type safe.
+    // This is how service initialization should hang on to this variable, with
+    // the proper type
+    TypedBufferedMutator<T> table =
+        new BufferedMutatorDelegator<T>(bufferedMutator);
+
+    return table;
+  }
+
+  /**
+   * @param hbaseConf used to read settings that override defaults
+   * @param conn used to create table from
+   * @param scan that specifies what you want to read from this table.
+   * @return scanner for the table.
+   * @throws IOException
+   */
+  public ResultScanner getResultScanner(Configuration hbaseConf,
+      Connection conn, Scan scan) throws IOException {
+    Table table = conn.getTable(getTableName(hbaseConf));
+    return table.getScanner(scan);
+  }
+
+  /**
+   * Get the table name for this table.
+   *
+   * @param hbaseConf
+   */
+  public TableName getTableName(Configuration hbaseConf) {
+    TableName table =
+        TableName.valueOf(hbaseConf.get(tableNameConfName, defaultTableName));
+    return table;
+
+  }
+
+  /**
+   * Used to create the table in HBase. Should be called only once (per HBase
+   * instance).
+   *
+   * @param admin
+   * @param hbaseConf
+   */
+  public abstract void createTable(Admin admin, Configuration hbaseConf)
+      throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ed0625/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java
new file mode 100644
index 0000000..fe8f9c6
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Mutation;
+
+/**
+ * To be used to wrap an actual {@link BufferedMutator} in a type safe manner
+ *
+ * @param <T> The class referring to the table to be written to.
+ */
+class BufferedMutatorDelegator<T> implements TypedBufferedMutator<T> {
+
+  private final BufferedMutator bufferedMutator;
+
+  /**
+   * @param bufferedMutator the mutator to be wrapped for delegation. Shall not
+   *          be null.
+   */
+  public BufferedMutatorDelegator(BufferedMutator bufferedMutator) {
+    this.bufferedMutator = bufferedMutator;
+  }
+
+  public TableName getName() {
+    return bufferedMutator.getName();
+  }
+
+  public Configuration getConfiguration() {
+    return bufferedMutator.getConfiguration();
+  }
+
+  public void mutate(Mutation mutation) throws IOException {
+    bufferedMutator.mutate(mutation);
+  }
+
+  public void mutate(List<? extends Mutation> mutations) throws IOException {
+    bufferedMutator.mutate(mutations);
+  }
+
+  public void close() throws IOException {
+    bufferedMutator.close();
+  }
+
+  public void flush() throws IOException {
+    bufferedMutator.flush();
+  }
+
+  public long getWriteBufferSize() {
+    return bufferedMutator.getWriteBufferSize();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ed0625/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
new file mode 100644
index 0000000..3397d62
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
+
+/**
+ * A Column represents the way to store a fully qualified column in a specific
+ * table.
+ */
+public interface Column<T> {
+
+  /**
+   * Sends a Mutation to the table. The mutations will be buffered and sent 
over
+   * the wire as part of a batch.
+   *
+   * @param rowKey identifying the row to write. Nothing gets written when 
null.
+   * @param tableMutator used to modify the underlying HBase table. Caller is
+   *          responsible to pass a mutator for the table that actually has 
this
+   *          column.
+   * @param timestamp version timestamp. When null the server timestamp will be
+   *          used.
+   * @param inputValue the value to write to the rowKey and column qualifier.
+   *          Nothing gets written when null.
+   * @throws IOException
+   */
+  public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
+      Long timestamp, Object inputValue) throws IOException;
+
+  /**
+   * Get the latest version of this specified column. Note: this call clones 
the
+   * value content of the hosting {@link Cell}.
+   *
+   * @param result Cannot be null
+   * @return result object (can be cast to whatever object was written to), or
+   *         null when result doesn't contain this column.
+   * @throws IOException
+   */
+  public Object readResult(Result result) throws IOException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ed0625/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnFamily.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnFamily.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnFamily.java
new file mode 100644
index 0000000..c84c016
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnFamily.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+/**
+ * Type safe column family.
+ *
+ * @param <T> refers to the table for which this column family is used for.
+ */
+public interface ColumnFamily<T> {
+
+  /**
+   * Keep a local copy if you need to avoid overhead of repeated cloning.
+   *
+   * @return a clone of the byte representation of the column family.
+   */
+  public byte[] getBytes();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ed0625/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
new file mode 100644
index 0000000..6a204dc
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+
+/**
+ * This class is meant to be used only by explicit Columns, and not directly to
+ * write by clients.
+ *
+ * @param <T> refers to the table.
+ */
+public class ColumnHelper<T> {
+
+  private final ColumnFamily<T> columnFamily;
+
+  /**
+   * Local copy of bytes representation of columnFamily so that we can avoid
+   * cloning a new copy over and over.
+   */
+  private final byte[] columnFamilyBytes;
+
+  public ColumnHelper(ColumnFamily<T> columnFamily) {
+    this.columnFamily = columnFamily;
+    columnFamilyBytes = columnFamily.getBytes();
+  }
+
+  /**
+   * Sends a Mutation to the table. The mutations will be buffered and sent 
over
+   * the wire as part of a batch.
+   *
+   * @param rowKey identifying the row to write. Nothing gets written when 
null.
+   * @param tableMutator used to modify the underlying HBase table
+   * @param columnQualifier column qualifier. Nothing gets written when null.
+   * @param timestamp version timestamp. When null the server timestamp will be
+   *          used.
+   * @param inputValue the value to write to the rowKey and column qualifier.
+   *          Nothing gets written when null.
+   * @throws IOException
+   */
+  public void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator,
+      byte[] columnQualifier, Long timestamp, Object inputValue)
+      throws IOException {
+    if ((rowKey == null) || (columnQualifier == null) || (inputValue == null)) 
{
+      return;
+    }
+    Put p = new Put(rowKey);
+
+    if (timestamp == null) {
+      p.addColumn(columnFamilyBytes, columnQualifier,
+          GenericObjectMapper.write(inputValue));
+    } else {
+      p.addColumn(columnFamilyBytes, columnQualifier, timestamp,
+          GenericObjectMapper.write(inputValue));
+    }
+    tableMutator.mutate(p);
+  }
+
+  /**
+   * @return the column family for this column implementation.
+   */
+  public ColumnFamily<T> getColumnFamily() {
+    return columnFamily;
+  }
+
+  /**
+   * Get the latest version of this specified column. Note: this call clones 
the
+   * value content of the hosting {@link Cell}.
+   *
+   * @param result from which to read the value. Cannot be null
+   * @param columnQualifierBytes referring to the column to be read.
+   * @return latest version of the specified column of whichever object was
+   *         written.
+   * @throws IOException
+   */
+  public Object readResult(Result result, byte[] columnQualifierBytes)
+      throws IOException {
+    if (result == null || columnQualifierBytes == null) {
+      return null;
+    }
+
+    // Would have preferred to be able to use getValueAsByteBuffer and get a
+    // ByteBuffer to avoid copy, but GenericObjectMapper doesn't seem to like
+    // that.
+    byte[] value = result.getValue(columnFamilyBytes, columnQualifierBytes);
+    return GenericObjectMapper.read(value);
+  }
+
+  /**
+   * @param result from which to reads timeseries data
+   * @param columnPrefixBytes optional prefix to limit columns. If null all
+   *          columns are returned.
+   * @return the cell values at each respective time in for form
+   *         {idA={timestamp1->value1}, idA={timestamp2->value2},
+   *         idB={timestamp3->value3}, idC={timestamp1->value4}}
+   * @throws IOException
+   */
+  public NavigableMap<String, NavigableMap<Long, Number>> 
readTimeseriesResults(
+      Result result, byte[] columnPrefixBytes) throws IOException {
+
+    NavigableMap<String, NavigableMap<Long, Number>> results =
+        new TreeMap<String, NavigableMap<Long, Number>>();
+
+    if (result != null) {
+      NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> 
resultMap =
+          result.getMap();
+
+      NavigableMap<byte[], NavigableMap<Long, byte[]>> columnCellMap =
+          resultMap.get(columnFamilyBytes);
+
+      // could be that there is no such column family.
+      if (columnCellMap != null) {
+        for (Entry<byte[], NavigableMap<Long, byte[]>> entry : columnCellMap
+            .entrySet()) {
+          String columnName = null;
+          if (columnPrefixBytes == null) {
+            // Decode the spaces we encoded in the column name.
+            columnName = Separator.decode(entry.getKey(), Separator.SPACE);
+          } else {
+            // A non-null prefix means columns are actually of the form
+            // prefix!columnNameRemainder
+            byte[][] columnNameParts =
+                Separator.QUALIFIERS.split(entry.getKey(), 2);
+            byte[] actualColumnPrefixBytes = columnNameParts[0];
+            if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
+                && columnNameParts.length == 2) {
+              // This is the prefix that we want
+              columnName = Separator.decode(columnNameParts[1]);
+            }
+          }
+
+          // If this column has the prefix we want
+          if (columnName != null) {
+            NavigableMap<Long, Number> cellResults =
+                new TreeMap<Long, Number>();
+            NavigableMap<Long, byte[]> cells = entry.getValue();
+            if (cells != null) {
+              for (Entry<Long, byte[]> cell : cells.entrySet()) {
+                Number value =
+                    (Number) GenericObjectMapper.read(cell.getValue());
+                cellResults.put(cell.getKey(), value);
+              }
+            }
+            results.put(columnName, cellResults);
+          }
+        } // for entry : columnCellMap
+      } // if columnCellMap != null
+    } // if result != null
+    return results;
+  }
+
+  /**
+   * @param result from which to read columns
+   * @param columnPrefixBytes optional prefix to limit columns. If null all
+   *          columns are returned.
+   * @return the latest values of columns in the column family.
+   * @throws IOException
+   */
+  public Map<String, Object> readResults(Result result, byte[] 
columnPrefixBytes)
+      throws IOException {
+    Map<String, Object> results = new HashMap<String, Object>();
+
+    if (result != null) {
+      Map<byte[], byte[]> columns = result.getFamilyMap(columnFamilyBytes);
+      for (Entry<byte[], byte[]> entry : columns.entrySet()) {
+        if (entry.getKey() != null && entry.getKey().length > 0) {
+
+          String columnName = null;
+          if (columnPrefixBytes == null) {
+            // Decode the spaces we encoded in the column name.
+            columnName = Separator.decode(entry.getKey(), Separator.SPACE);
+          } else {
+            // A non-null prefix means columns are actually of the form
+            // prefix!columnNameRemainder
+            byte[][] columnNameParts =
+                Separator.QUALIFIERS.split(entry.getKey(), 2);
+            byte[] actualColumnPrefixBytes = columnNameParts[0];
+            if (Bytes.equals(columnPrefixBytes, actualColumnPrefixBytes)
+                && columnNameParts.length == 2) {
+              // This is the prefix that we want
+              columnName = Separator.decode(columnNameParts[1]);
+            }
+          }
+
+          // If this column has the prefix we want
+          if (columnName != null) {
+            Object value = GenericObjectMapper.read(entry.getValue());
+            results.put(columnName, value);
+          }
+        }
+      } // for entry
+    }
+    return results;
+  }
+
+  /**
+   * @param columnPrefixBytes The byte representation for the column prefix.
+   *          Should not contain {@link Separator#QUALIFIERS}.
+   * @param qualifier for the remainder of the column. Any
+   *          {@link Separator#QUALIFIERS} will be encoded in the qualifier.
+   * @return fully sanitized column qualifier that is a combination of prefix
+   *         and qualifier. If prefix is null, the result is simply the encoded
+   *         qualifier without any separator.
+   */
+  public static byte[] getColumnQualifier(byte[] columnPrefixBytes,
+      String qualifier) {
+
+    // We don't want column names to have spaces
+    byte[] encodedQualifier = Bytes.toBytes(Separator.SPACE.encode(qualifier));
+    if (columnPrefixBytes == null) {
+      return encodedQualifier;
+    }
+
+    // Convert qualifier to lower case, strip of separators and tag on column
+    // prefix.
+    byte[] columnQualifier =
+        Separator.QUALIFIERS.join(columnPrefixBytes, encodedQualifier);
+    return columnQualifier;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ed0625/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
new file mode 100644
index 0000000..2eedea0
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
+
+/**
+ * Used to represent a partially qualified column, where the actual column name
+ * will be composed of a prefix and the remainder of the column qualifier. The
+ * prefix can be null, in which case the column qualifier will be completely
+ * determined when the values are stored.
+ */
+public interface ColumnPrefix<T> {
+
+  /**
+   * Sends a Mutation to the table. The mutations will be buffered and sent 
over
+   * the wire as part of a batch.
+   *
+   * @param rowKey identifying the row to write. Nothing gets written when 
null.
+   * @param tableMutator used to modify the underlying HBase table. Caller is
+   *          responsible to pass a mutator for the table that actually has 
this
+   *          column.
+   * @param qualifier column qualifier. Nothing gets written when null.
+   * @param timestamp version timestamp. When null the server timestamp will be
+   *          used.
+   * @param inputValue the value to write to the rowKey and column qualifier.
+   *          Nothing gets written when null.
+   * @throws IOException
+   */
+  public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
+      String qualifier, Long timestamp, Object inputValue) throws IOException;
+
+  /**
+   * Get the latest version of this specified column. Note: this call clones 
the
+   * value content of the hosting {@link Cell}.
+   *
+   * @param result Cannot be null
+   * @param qualifier column qualifier. Nothing gets read when null.
+   * @return result object (can be cast to whatever object was written to) or
+   *         null when specified column qualifier for this prefix doesn't exist
+   *         in the result.
+   * @throws IOException
+   */
+  public Object readResult(Result result, String qualifier) throws IOException;
+
+  /**
+   * @param resultfrom which to read columns
+   * @return the latest values of columns in the column family with this prefix
+   *         (or all of them if the prefix value is null).
+   * @throws IOException
+   */
+  public Map<String, Object> readResults(Result result) throws IOException;
+
+  /**
+   * @param result from which to reads timeseries data
+   * @return the cell values at each respective time in for form
+   *         {idA={timestamp1->value1}, idA={timestamp2->value2},
+   *         idB={timestamp3->value3}, idC={timestamp1->value4}}
+   * @throws IOException
+   */
+  public NavigableMap<String, NavigableMap<Long, Number>> 
readTimeseriesResults(
+      Result result) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ed0625/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Range.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Range.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Range.java
new file mode 100644
index 0000000..2cb6c08
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Range.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class Range {
+  private final int startIdx;
+  private final int endIdx;
+
+  /**
+   * Defines a range from start index (inclusive) to end index (exclusive).
+   *
+   * @param start
+   *          Starting index position
+   * @param end
+   *          Ending index position (exclusive)
+   */
+  public Range(int start, int end) {
+    if (start < 0 || end < start) {
+      throw new IllegalArgumentException(
+          "Invalid range, required that: 0 <= start <= end; start=" + start
+              + ", end=" + end);
+    }
+
+    this.startIdx = start;
+    this.endIdx = end;
+  }
+
+  public int start() {
+    return startIdx;
+  }
+
+  public int end() {
+    return endIdx;
+  }
+
+  public int length() {
+    return endIdx - startIdx;
+  }
+}
\ No newline at end of file

Reply via email to