Repository: incubator-metron
Updated Branches:
  refs/heads/master c64e8ad03 -> 3a2ecc404


METRON-419 Update Tuple to HBase Mapper/Bolt to Set TTL (nickwallen) closes 
apache/incubator-metron#252


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/3a2ecc40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/3a2ecc40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/3a2ecc40

Branch: refs/heads/master
Commit: 3a2ecc404bdd8e6047dbe7199dee45192aa94428
Parents: c64e8ad
Author: nickwallen <n...@nickallen.org>
Authored: Fri Sep 16 12:39:16 2016 -0400
Committer: Nick Allen <n...@nickallen.org>
Committed: Fri Sep 16 12:39:16 2016 -0400

----------------------------------------------------------------------
 .../metron/profiler/client/ProfileWriter.java   |   3 +-
 .../profiler/bolt/ProfileHBaseMapper.java       |  23 ++-
 .../metron/profiler/hbase/ColumnBuilder.java    |   2 +-
 .../profiler/hbase/ValueOnlyColumnBuilder.java  |   2 +-
 metron-platform/metron-hbase/pom.xml            |  16 --
 .../org/apache/metron/hbase/bolt/HBaseBolt.java |  26 +--
 .../metron/hbase/bolt/mapper/ColumnList.java    | 178 +++++++++++++++++++
 .../metron/hbase/bolt/mapper/HBaseMapper.java   |  56 ++++++
 .../bolt/mapper/HBaseProjectionCriteria.java    |  92 ++++++++++
 .../metron/hbase/bolt/mapper/IColumn.java       |  35 ++++
 .../metron/hbase/bolt/mapper/ICounter.java      |  34 ++++
 .../apache/metron/hbase/client/HBaseClient.java |   6 +-
 .../org/apache/metron/hbase/WidgetMapper.java   |  22 ++-
 .../apache/metron/hbase/bolt/HBaseBoltTest.java |  45 +++--
 .../metron/hbase/client/HBaseClientTest.java    |   6 +-
 15 files changed, 492 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
 
b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
index eb88b5a..0ad2f44 100644
--- 
a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
+++ 
b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
@@ -22,13 +22,12 @@ package org.apache.metron.profiler.client;
 
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.metron.hbase.bolt.mapper.ColumnList;
 import org.apache.metron.hbase.client.HBaseClient;
 import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.metron.profiler.ProfilePeriod;
 import org.apache.metron.profiler.hbase.ColumnBuilder;
 import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.storm.hbase.common.ColumnList;
 
 import java.util.List;
 import java.util.concurrent.TimeUnit;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
index b0b33cc..2c8cb67 100644
--- 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
+++ 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
@@ -22,18 +22,21 @@ package org.apache.metron.profiler.bolt;
 
 import backtype.storm.tuple.Tuple;
 import org.apache.commons.beanutils.BeanMap;
+import org.apache.commons.lang.StringUtils;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.common.dsl.ParseException;
+import org.apache.metron.hbase.bolt.mapper.HBaseMapper;
 import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.metron.profiler.hbase.ColumnBuilder;
 import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
 import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
 import org.apache.metron.profiler.hbase.RowKeyBuilder;
 import org.apache.metron.profiler.stellar.StellarExecutor;
-import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
-import org.apache.storm.hbase.common.ColumnList;
+import org.apache.metron.hbase.bolt.mapper.ColumnList;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import static java.lang.String.format;
 import static org.apache.commons.collections.CollectionUtils.isEmpty;
@@ -70,7 +73,7 @@ public class ProfileHBaseMapper implements HBaseMapper {
 
   /**
    * Define the row key for a ProfileMeasurement.
-   * @param tuple The tuple containing a ProfileMeasurement.
+   * @param tuple The tuple to map to Hbase.
    * @return The Hbase row key.
    */
   @Override
@@ -82,7 +85,7 @@ public class ProfileHBaseMapper implements HBaseMapper {
 
   /**
    * Defines how the fields within a ProfileMeasurement are mapped to HBase.
-   * @param tuple The tuple containing the ProfileMeasurement.
+   * @param tuple The tuple to map to Hbase.
    */
   @Override
   public ColumnList columns(Tuple tuple) {
@@ -91,6 +94,18 @@ public class ProfileHBaseMapper implements HBaseMapper {
   }
 
   /**
+   * The time-to-live can be defined differently for each profile.
+   * @param tuple The tuple to map to Hbase.
+   * @return
+   */
+  @Override
+  public Optional<Long> getTTL(Tuple tuple) {
+    // TTL not yet supported for profiles
+    Optional result = Optional.empty();
+    return result;
+  }
+
+  /**
    * Executes each of the 'groupBy' expressions.  The result of each
    * expression are the groups used to sort the data as part of the
    * row key.

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ColumnBuilder.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ColumnBuilder.java
 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ColumnBuilder.java
index 44bf129..c645822 100644
--- 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ColumnBuilder.java
+++ 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ColumnBuilder.java
@@ -20,8 +20,8 @@
 
 package org.apache.metron.profiler.hbase;
 
+import org.apache.metron.hbase.bolt.mapper.ColumnList;
 import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.storm.hbase.common.ColumnList;
 
 import java.io.Serializable;
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
index cc6aa5a..bb1baf6 100644
--- 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
+++ 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
@@ -23,7 +23,7 @@ package org.apache.metron.profiler.hbase;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.utils.SerDeUtils;
 import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.storm.hbase.common.ColumnList;
+import org.apache.metron.hbase.bolt.mapper.ColumnList;
 
 /**
  * A ColumnBuilder that writes only the value of a ProfileMeasurement.

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/pom.xml 
b/metron-platform/metron-hbase/pom.xml
index 330b52c..c346325 100644
--- a/metron-platform/metron-hbase/pom.xml
+++ b/metron-platform/metron-hbase/pom.xml
@@ -115,22 +115,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
-            <artifactId>storm-hbase</artifactId>
-            <version>${global_storm_version}</version>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <artifactId>servlet-api</artifactId>
-                    <groupId>javax.servlet</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>log4j-over-slf4j</artifactId>
-                    <groupId>org.slf4j</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.storm</groupId>
             <artifactId>storm-hdfs</artifactId>
             <version>${global_storm_version}</version>
             <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
 
b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
index a2da837..ede1d8c 100644
--- 
a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
+++ 
b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
@@ -29,19 +29,17 @@ import backtype.storm.tuple.Tuple;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.metron.hbase.HTableProvider;
 import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.hbase.bolt.mapper.ColumnList;
+import org.apache.metron.hbase.bolt.mapper.HBaseMapper;
 import org.apache.metron.hbase.client.HBaseClient;
-import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
-import org.apache.storm.hbase.common.ColumnList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.InvocationTargetException;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * A bolt that writes to HBase.
@@ -158,10 +156,17 @@ public class HBaseBolt extends BaseRichBolt {
    * @param tuple Contains the data elements that need written to HBase.
    */
   private void save(Tuple tuple) {
-    byte[] rowKey = this.mapper.rowKey(tuple);
-    ColumnList cols = this.mapper.columns(tuple);
+    byte[] rowKey = mapper.rowKey(tuple);
+    ColumnList cols = mapper.columns(tuple);
     Durability durability = writeToWAL ? Durability.SYNC_WAL : 
Durability.SKIP_WAL;
-    hbaseClient.addMutation(rowKey, cols, durability);
+
+    Optional<Long> ttl = mapper.getTTL(tuple);
+    if(ttl.isPresent()) {
+      hbaseClient.addMutation(rowKey, cols, durability, ttl.get());
+    } else {
+      hbaseClient.addMutation(rowKey, cols, durability);
+    }
+
     batchHelper.addBatch(tuple);
   }
 
@@ -174,9 +179,8 @@ public class HBaseBolt extends BaseRichBolt {
   }
 
   /**
-   *
-   * @param connectorImpl
-   * @return
+   * Creates a TableProvider based on a class name.
+   * @param connectorImpl The class name of a TableProvider
    */
   private static TableProvider getTableProvider(String connectorImpl) {
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/ColumnList.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/ColumnList.java
 
b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/ColumnList.java
new file mode 100644
index 0000000..ad4f8d6
--- /dev/null
+++ 
b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/ColumnList.java
@@ -0,0 +1,178 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.hbase.bolt.mapper;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Represents a list of HBase columns.
+ *
+ * There are two types of columns, <i>standard</i> and <i>counter</i>.
+ *
+ * Standard columns have <i>column family</i> (required), <i>qualifier</i> 
(optional),
+ * <i>timestamp</i> (optional), and a <i>value</i> (optional) values.
+ *
+ * Counter columns have <i>column family</i> (required), <i>qualifier</i> 
(optional),
+ * and an <i>increment</i> (optional, but recommended) values.
+ *
+ * Inserts/Updates can be added via the <code>addColumn()</code> and 
<code>addCounter()</code>
+ * methods.
+ *
+ * Original code based on the Apache Storm project. See
+ * https://github.com/apache/storm/tree/master/external/storm-hbase.
+ */
+public class ColumnList {
+
+  public static abstract class AbstractColumn {
+    byte[] family, qualifier;
+
+    AbstractColumn(byte[] family, byte[] qualifier){
+      this.family = family;
+      this.qualifier = qualifier;
+    }
+
+    public byte[] getFamily() {
+      return family;
+    }
+
+    public byte[] getQualifier() {
+      return qualifier;
+    }
+  }
+
+  public static class Column extends AbstractColumn {
+    byte[] value;
+    long ts = -1;
+
+    Column(byte[] family, byte[] qualifier, long ts, byte[] value){
+      super(family, qualifier);
+      this.value = value;
+      this.ts = ts;
+    }
+
+    public byte[] getValue() {
+      return value;
+    }
+
+    public long getTs() {
+      return ts;
+    }
+  }
+
+  public static class Counter extends AbstractColumn {
+    long incr = 0;
+    Counter(byte[] family, byte[] qualifier, long incr){
+      super(family, qualifier);
+      this.incr = incr;
+    }
+
+    public long getIncrement() {
+      return incr;
+    }
+  }
+
+  private ArrayList<ColumnList.Column> columns;
+  private ArrayList<ColumnList.Counter> counters;
+
+  private ArrayList<Column> columns(){
+    if(this.columns == null){
+      this.columns = new ArrayList<>();
+    }
+    return this.columns;
+  }
+
+  private ArrayList<Counter> counters(){
+    if(this.counters == null){
+      this.counters = new ArrayList<>();
+    }
+    return this.counters;
+  }
+
+  /**
+   * Add a standard HBase column.
+   */
+  public ColumnList addColumn(byte[] family, byte[] qualifier, long ts, byte[] 
value){
+    columns().add(new Column(family, qualifier, ts, value));
+    return this;
+  }
+
+  /**
+   * Add a standard HBase column
+   */
+  public ColumnList addColumn(byte[] family, byte[] qualifier, byte[] value){
+    columns().add(new Column(family, qualifier, -1, value));
+    return this;
+  }
+
+  /**
+   * Add a standard HBase column given an instance of a class that implements
+   * the <code>IColumn</code> interface.
+   */
+  public ColumnList addColumn(IColumn column){
+    return this.addColumn(column.family(), column.qualifier(), 
column.timestamp(), column.value());
+  }
+
+  /**
+   * Add an HBase counter column.
+   */
+  public ColumnList addCounter(byte[] family, byte[] qualifier, long incr){
+    counters().add(new Counter(family, qualifier, incr));
+    return this;
+  }
+
+  /**
+   * Add an HBase counter column given an instance of a class that implements 
the
+   * <code>ICounter</code> interface.
+   */
+  public ColumnList addCounter(ICounter counter){
+    return this.addCounter(counter.family(), counter.qualifier(), 
counter.increment());
+  }
+
+
+  /**
+   * Query to determine if we have column definitions.
+   */
+  public boolean hasColumns(){
+    return this.columns != null;
+  }
+
+  /**
+   * Query to determine if we have counter definitions.
+   */
+  public boolean hasCounters(){
+    return this.counters != null;
+  }
+
+  /**
+   * Get the list of column definitions.
+   */
+  public List<Column> getColumns(){
+    return this.columns;
+  }
+
+  /**
+   * Get the list of counter definitions.
+   */
+  public List<Counter> getCounters(){
+    return this.counters;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/HBaseMapper.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/HBaseMapper.java
 
b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/HBaseMapper.java
new file mode 100644
index 0000000..e662c76
--- /dev/null
+++ 
b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/HBaseMapper.java
@@ -0,0 +1,56 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.hbase.bolt.mapper;
+
+import backtype.storm.tuple.Tuple;
+import java.io.Serializable;
+import java.util.Optional;
+
+/**
+ * Maps a <code>backtype.storm.tuple.Tuple</code> object
+ * to a row in an HBase table.
+ *
+ * Original code based on the Apache Storm project. See
+ * https://github.com/apache/storm/tree/master/external/storm-hbase.
+ */
+public interface HBaseMapper extends Serializable {
+
+  /**
+   * Given a tuple, return the HBase rowkey.
+   *
+   * @param tuple The tuple to map to Hbase
+   */
+  byte[] rowKey(Tuple tuple);
+
+  /**
+   * Given a tuple, return a list of HBase columns to insert.
+   *
+   * @param tuple The tuple to map to Hbase
+   */
+  ColumnList columns(Tuple tuple);
+
+  /**
+   * Given a tuple, return the time to live.
+   *
+   * @param tuple The tuple to map to Hbase
+   */
+  Optional<Long> getTTL(Tuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/HBaseProjectionCriteria.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/HBaseProjectionCriteria.java
 
b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/HBaseProjectionCriteria.java
new file mode 100644
index 0000000..d3c8237
--- /dev/null
+++ 
b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/HBaseProjectionCriteria.java
@@ -0,0 +1,92 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.hbase.bolt.mapper;
+
+import com.google.common.collect.Lists;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Allows the user to specify the projection criteria.
+ * If only columnFamily is specified all columns from that family will be 
returned.
+ * If a column is specified only that column from that family will be returned.
+ *
+ * Original code based on the Apache Storm project. See
+ * https://github.com/apache/storm/tree/master/external/storm-hbase.
+ */
+public class HBaseProjectionCriteria implements Serializable {
+
+  private List<byte[]> columnFamilies;
+  private List<ColumnMetaData> columns;
+
+  public static class ColumnMetaData implements Serializable {
+
+    private byte[]  columnFamily;
+    private byte[] qualifier;
+
+    public ColumnMetaData(String columnFamily, String qualifier) {
+      this.columnFamily = columnFamily.getBytes();
+      this.qualifier = qualifier.getBytes();
+    }
+
+    public byte[] getColumnFamily() {
+      return columnFamily;
+    }
+
+    public byte[] getQualifier() {
+      return qualifier;
+    }
+  }
+
+  public HBaseProjectionCriteria() {
+    columnFamilies = Lists.newArrayList();
+    columns = Lists.newArrayList();
+  }
+
+  /**
+   * all columns from this family will be included as result of HBase lookup.
+   * @param columnFamily
+   * @return
+   */
+  public HBaseProjectionCriteria addColumnFamily(String columnFamily) {
+    this.columnFamilies.add(columnFamily.getBytes());
+    return this;
+  }
+
+  /**
+   * Only this column from the the columnFamily will be included as result of 
HBase lookup.
+   * @param column
+   * @return
+   */
+  public HBaseProjectionCriteria addColumn(ColumnMetaData column) {
+    this.columns.add(column);
+    return this;
+  }
+
+  public List<ColumnMetaData> getColumns() {
+    return columns;
+  }
+
+  public List<byte[]> getColumnFamilies() {
+    return columnFamilies;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/IColumn.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/IColumn.java
 
b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/IColumn.java
new file mode 100644
index 0000000..d5749ad
--- /dev/null
+++ 
b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/IColumn.java
@@ -0,0 +1,35 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.hbase.bolt.mapper;
+
+/**
+ * Interface definition for classes that support being written to HBase as
+ * a regular column.
+ *
+ * Original code based on the Apache Storm project. See
+ * https://github.com/apache/storm/tree/master/external/storm-hbase.
+ */
+public interface IColumn {
+  byte[] family();
+  byte[] qualifier();
+  byte[] value();
+  long timestamp();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/ICounter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/ICounter.java
 
b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/ICounter.java
new file mode 100644
index 0000000..03fc9fe
--- /dev/null
+++ 
b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/bolt/mapper/ICounter.java
@@ -0,0 +1,34 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.hbase.bolt.mapper;
+
+/**
+ * Interface definition for classes that support being written to HBase as
+ * a counter column.
+ *
+ * Original code based on the Apache Storm project. See
+ * https://github.com/apache/storm/tree/master/external/storm-hbase.
+ */
+public interface ICounter {
+  byte[] family();
+  byte[] qualifier();
+  long increment();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
 
b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
index e078d50..ff4903d 100644
--- 
a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
+++ 
b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.metron.hbase.TableProvider;
-import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
-import org.apache.storm.hbase.common.ColumnList;
+import org.apache.metron.hbase.bolt.mapper.ColumnList;
+import org.apache.metron.hbase.bolt.mapper.HBaseProjectionCriteria;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -106,7 +106,7 @@ public class HBaseClient implements Closeable {
    * @param durability       The durability of the mutation.
    * @param timeToLiveMillis The time to live in milliseconds.
    */
-  public void addMutation(byte[] rowKey, ColumnList cols, Durability 
durability, long timeToLiveMillis) {
+  public void addMutation(byte[] rowKey, ColumnList cols, Durability 
durability, Long timeToLiveMillis) {
 
     if (cols.hasColumns()) {
       Put put = createPut(rowKey, cols, durability, timeToLiveMillis);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/WidgetMapper.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/WidgetMapper.java
 
b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/WidgetMapper.java
index c7d3fae..4f97a5b 100644
--- 
a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/WidgetMapper.java
+++ 
b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/WidgetMapper.java
@@ -22,16 +22,27 @@ package org.apache.metron.hbase;
 
 import backtype.storm.tuple.Tuple;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
-import org.apache.storm.hbase.common.ColumnList;
+import org.apache.metron.hbase.bolt.mapper.ColumnList;
+import org.apache.metron.hbase.bolt.mapper.HBaseMapper;
+
+import java.util.Optional;
 
-import java.util.Calendar;
 
 /**
  * Maps a Widget to HBase.  Used only for testing.
  */
 public class WidgetMapper implements HBaseMapper {
 
+  private Optional<Long> ttl;
+
+  public WidgetMapper() {
+    this.ttl = Optional.empty();
+  }
+
+  public WidgetMapper(Long ttl) {
+    this.ttl = Optional.of(ttl);
+  }
+
   @Override
   public byte[] rowKey(Tuple tuple) {
     Widget w = (Widget) tuple.getValueByField("widget");
@@ -48,6 +59,11 @@ public class WidgetMapper implements HBaseMapper {
     return cols;
   }
 
+  @Override
+  public Optional<Long> getTTL(Tuple tuple) {
+    return ttl;
+  }
+
   public static final String CF_STRING = "cfWidget";
   public static final byte[] CF = Bytes.toBytes(CF_STRING);
   public static final byte[] QNAME = Bytes.toBytes("qName");

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
 
b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
index 8b39aaa..621720e 100644
--- 
a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
+++ 
b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
@@ -26,13 +26,13 @@ import org.apache.metron.hbase.Widget;
 import org.apache.metron.hbase.WidgetMapper;
 import org.apache.metron.hbase.client.HBaseClient;
 import org.apache.metron.test.bolt.BaseBoltTest;
-import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
@@ -52,7 +52,6 @@ public class HBaseBoltTest extends BaseBoltTest {
   private Tuple tuple2;
   private Widget widget1;
   private Widget widget2;
-  private HBaseMapper mapper;
 
   @Before
   public void setupTuples() throws Exception {
@@ -68,7 +67,6 @@ public class HBaseBoltTest extends BaseBoltTest {
 
   @Before
   public void setup() throws Exception {
-    mapper = new WidgetMapper();
     tuple1 = mock(Tuple.class);
     tuple2 = mock(Tuple.class);
     client = mock(HBaseClient.class);
@@ -77,7 +75,7 @@ public class HBaseBoltTest extends BaseBoltTest {
   /**
    * Create a ProfileBuilderBolt to test
    */
-  private HBaseBolt createBolt(int batchSize) throws IOException {
+  private HBaseBolt createBolt(int batchSize, WidgetMapper mapper) throws 
IOException {
     HBaseBolt bolt = new HBaseBolt(tableName, mapper)
             .withBatchSize(batchSize);
     bolt.prepare(Collections.emptyMap(), topologyContext, outputCollector);
@@ -86,17 +84,18 @@ public class HBaseBoltTest extends BaseBoltTest {
   }
 
   /**
-   * What happens if the batch is full?
+   * What happens if the batch is ready to flush?
    *
    * If the batch size is 2 and we have received 2 tuples the batch should be 
flushed.
    */
   @Test
   public void testBatchReady() throws Exception {
-    HBaseBolt bolt = createBolt(2);
+    HBaseBolt bolt = createBolt(2, new WidgetMapper());
     bolt.execute(tuple1);
     bolt.execute(tuple2);
 
     // batch size is 2, received 2 tuples - flush the batch
+    verify(client, times(2)).addMutation(any(), any(), any());
     verify(client, times(1)).mutate();
   }
 
@@ -105,10 +104,11 @@ public class HBaseBoltTest extends BaseBoltTest {
    */
   @Test
   public void testBatchNotReady() throws Exception {
-    HBaseBolt bolt = createBolt(2);
+    HBaseBolt bolt = createBolt(2, new WidgetMapper());
     bolt.execute(tuple1);
 
-    // batch size is 2, but only 1 tuple received - do not flush batch
+    // 1 put was added to the batch, but nothing was flushed
+    verify(client, times(1)).addMutation(any(), any(), any());
     verify(client, times(0)).mutate();
   }
 
@@ -117,10 +117,11 @@ public class HBaseBoltTest extends BaseBoltTest {
    */
   @Test
   public void testTimeFlush() throws Exception {
-    HBaseBolt bolt = createBolt(2);
+    HBaseBolt bolt = createBolt(2, new WidgetMapper());
 
     // the batch is not ready to write
     bolt.execute(tuple1);
+    verify(client, times(1)).addMutation(any(), any(), any());
     verify(client, times(0)).mutate();
 
     // the batch should be flushed after the tick tuple
@@ -128,6 +129,30 @@ public class HBaseBoltTest extends BaseBoltTest {
     verify(client, times(1)).mutate();
   }
 
+  /**
+   * The mapper can define a TTL that the HBaseBolt uses to determine
+   * if the Put to Hbase needs the TTL set.
+   */
+  @Test
+  public void testWriteWithTTL() throws Exception {
+
+    // setup - create a mapper with a TTL set
+    final Long expectedTTL = 2000L;
+    WidgetMapper mapperWithTTL = new WidgetMapper(expectedTTL);
+
+    // execute
+    HBaseBolt bolt = createBolt(2, mapperWithTTL);
+    bolt.execute(tuple1);
+    bolt.execute(tuple2);
+
+    // used to grab the actual TTL
+    ArgumentCaptor<Long> ttlCaptor = ArgumentCaptor.forClass(Long.class);
+
+    // validate - ensure the Puts written with the TTL
+    verify(client, times(2)).addMutation(any(), any(), any(), 
ttlCaptor.capture());
+    Assert.assertEquals(expectedTTL, ttlCaptor.getValue());
+  }
+
   private static Tuple mockTuple(String componentId, String streamId) {
     Tuple tuple = mock(Tuple.class);
     when(tuple.getSourceComponent()).thenReturn(componentId);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3a2ecc40/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
 
b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
index e2afd1c..00a1126 100644
--- 
a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
+++ 
b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
@@ -30,9 +30,9 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.hbase.Widget;
 import org.apache.metron.hbase.WidgetMapper;
-import org.apache.storm.hbase.bolt.mapper.HBaseMapper;
-import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria;
-import org.apache.storm.hbase.common.ColumnList;
+import org.apache.metron.hbase.bolt.mapper.ColumnList;
+import org.apache.metron.hbase.bolt.mapper.HBaseMapper;
+import org.apache.metron.hbase.bolt.mapper.HBaseProjectionCriteria;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;

Reply via email to