This is an automated email from the ASF dual-hosted git repository. nickallen pushed a commit to branch feature/METRON-2088-support-hdp-3.1 in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/feature/METRON-2088-support-hdp-3.1 by this push: new ef3a76c METRON-2175 Introduce HBase Connection Abstractions for HBase 2.0.2 (nickwallen) closes apache/metron#1456 ef3a76c is described below commit ef3a76c799d48856d8e4999226ec9392f298a2f7 Author: nickwallen <n...@nickallen.org> AuthorDate: Thu Jul 18 09:56:31 2019 -0400 METRON-2175 Introduce HBase Connection Abstractions for HBase 2.0.2 (nickwallen) closes apache/metron#1456 --- .../metron/profiler/client/ProfileWriter.java | 6 +- .../spark/function/HBaseWriterFunction.java | 4 +- .../org/apache/metron/hbase/bolt/HBaseBolt.java | 8 +- .../apache/metron/hbase/bolt/HBaseBoltTest.java | 6 +- .../org/apache/metron/rest/config/HBaseConfig.java | 6 +- .../impl/SensorEnrichmentConfigServiceImpl.java | 6 +- .../org/apache/metron/rest/config/TestConfig.java | 6 +- .../SensorEnrichmentConfigServiceImplTest.java | 6 +- .../metron-elasticsearch-common/pom.xml | 18 ++ .../metron/hbase/coprocessor/HBaseCacheWriter.java | 4 +- .../java/org/apache/metron/hbase/ColumnList.java | 97 ++++++- .../metron/hbase/HBaseProjectionCriteria.java | 10 +- .../apache/metron/hbase/client/HBaseClient.java | 317 ++++----------------- .../metron/hbase/client/HBaseClientFactory.java | 70 +++++ .../hbase/client/HBaseConnectionFactory.java | 59 ++++ .../metron/hbase/client/HBaseTableClient.java | 285 ++++++++++++++++++ .../hbase/client/HBaseTableClientFactory.java | 54 ++++ .../metron/hbase/client/HBaseWriterParams.java | 51 ++++ .../{HBaseClient.java => LegacyHBaseClient.java} | 4 +- .../metron/hbase/client/HBaseClientTest.java | 10 +- .../HBaseTableClientIntegrationTest.java | 286 +++++++++++++++++++ metron-platform/metron-pcap/pom.xml | 5 + pom.xml | 26 +- 23 files changed, 1039 insertions(+), 305 deletions(-) diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java index 4e00164..d4e5e66 100644 --- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java +++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.metron.hbase.HTableProvider; import org.apache.metron.hbase.ColumnList; -import org.apache.metron.hbase.client.HBaseClient; +import org.apache.metron.hbase.client.LegacyHBaseClient; import org.apache.metron.profiler.ProfileMeasurement; import org.apache.metron.profiler.ProfilePeriod; import org.apache.metron.profiler.hbase.ColumnBuilder; @@ -47,13 +47,13 @@ public class ProfileWriter { private RowKeyBuilder rowKeyBuilder; private ColumnBuilder columnBuilder; - private HBaseClient hbaseClient; + private LegacyHBaseClient hbaseClient; private HBaseProfilerClient client; public ProfileWriter(RowKeyBuilder rowKeyBuilder, ColumnBuilder columnBuilder, HTableInterface table, long periodDurationMillis) { this.rowKeyBuilder = rowKeyBuilder; this.columnBuilder = columnBuilder; - this.hbaseClient = new HBaseClient((c, t) -> table, table.getConfiguration(), table.getName().getNameAsString()); + this.hbaseClient = new LegacyHBaseClient((c, t) -> table, table.getConfiguration(), table.getName().getNameAsString()); this.client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder, periodDurationMillis); } diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java index cfabd94..6a090cf 100644 --- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java +++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Durability; import org.apache.metron.hbase.HTableProvider; import org.apache.metron.hbase.TableProvider; -import org.apache.metron.hbase.client.HBaseClient; +import org.apache.metron.hbase.client.LegacyHBaseClient; import org.apache.metron.profiler.ProfileMeasurement; import org.apache.metron.profiler.hbase.ColumnBuilder; import org.apache.metron.profiler.hbase.RowKeyBuilder; @@ -118,7 +118,7 @@ public class HBaseWriterFunction implements MapPartitionsFunction<ProfileMeasure // open an HBase connection Configuration config = HBaseConfiguration.create(); - try (HBaseClient client = new HBaseClient(tableProvider, config, tableName)) { + try (LegacyHBaseClient client = new LegacyHBaseClient(tableProvider, config, tableName)) { for (ProfileMeasurementAdapter adapter : measurements) { ProfileMeasurement m = adapter.toProfileMeasurement(); diff --git a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java index ec860a5..07bd552 100644 --- a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java +++ b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java @@ -31,7 +31,7 @@ import org.apache.metron.hbase.HTableProvider; import org.apache.metron.hbase.TableProvider; import org.apache.metron.hbase.ColumnList; import org.apache.metron.hbase.bolt.mapper.HBaseMapper; -import org.apache.metron.hbase.client.HBaseClient; +import org.apache.metron.hbase.client.LegacyHBaseClient; import org.apache.storm.Config; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; @@ -90,7 +90,7 @@ public class HBaseBolt extends BaseRichBolt { private BatchHelper batchHelper; protected OutputCollector collector; - protected transient HBaseClient hbaseClient; + protected transient LegacyHBaseClient hbaseClient; public HBaseBolt(String tableName, HBaseMapper mapper) { this.tableName = tableName; @@ -122,7 +122,7 @@ public class HBaseBolt extends BaseRichBolt { return this; } - public void setClient(HBaseClient hbaseClient) { + public void setClient(LegacyHBaseClient hbaseClient) { this.hbaseClient = hbaseClient; } @@ -147,7 +147,7 @@ public class HBaseBolt extends BaseRichBolt { provider = this.tableProvider; } - hbaseClient = new HBaseClient(provider, HBaseConfiguration.create(), tableName); + hbaseClient = new LegacyHBaseClient(provider, HBaseConfiguration.create(), tableName); } @Override diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java index bae3728..9146aff 100644 --- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java +++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java @@ -25,7 +25,7 @@ import org.apache.storm.Constants; import org.apache.storm.tuple.Tuple; import org.apache.metron.hbase.bolt.mapper.Widget; import org.apache.metron.hbase.bolt.mapper.WidgetMapper; -import org.apache.metron.hbase.client.HBaseClient; +import org.apache.metron.hbase.client.LegacyHBaseClient; import org.apache.metron.test.bolt.BaseBoltTest; import org.junit.Assert; import org.junit.Before; @@ -48,7 +48,7 @@ import static org.mockito.Mockito.when; public class HBaseBoltTest extends BaseBoltTest { private static final String tableName = "widgets"; - private HBaseClient client; + private LegacyHBaseClient client; private Tuple tuple1; private Tuple tuple2; private Widget widget1; @@ -71,7 +71,7 @@ public class HBaseBoltTest extends BaseBoltTest { public void setup() throws Exception { tuple1 = mock(Tuple.class); tuple2 = mock(Tuple.class); - client = mock(HBaseClient.class); + client = mock(LegacyHBaseClient.class); provider = mock(TableProvider.class); } diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/HBaseConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/HBaseConfig.java index 7ce16f9..c1476e8 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/HBaseConfig.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/HBaseConfig.java @@ -25,7 +25,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.metron.common.configuration.EnrichmentConfigurations; import org.apache.metron.hbase.HTableProvider; import org.apache.metron.hbase.TableProvider; -import org.apache.metron.hbase.client.HBaseClient; +import org.apache.metron.hbase.client.LegacyHBaseClient; import org.apache.metron.rest.RestException; import org.apache.metron.rest.service.GlobalConfigService; import org.apache.metron.rest.user.UserSettingsClient; @@ -60,7 +60,7 @@ public class HBaseConfig { } @Bean() - public HBaseClient hBaseClient() { + public LegacyHBaseClient hBaseClient() { Map<String, Object> restConfig = null; try { restConfig = globalConfigService.get(); @@ -75,7 +75,7 @@ public class HBaseConfig { } catch (ClassNotFoundException | InstantiationException | InvocationTargetException | IllegalAccessException | NoSuchMethodException e) { throw new IllegalStateException("Unable to create table provider", e); } - return new HBaseClient(provider, HBaseConfiguration.create(), + return new LegacyHBaseClient(provider, HBaseConfiguration.create(), (String) restConfig.get(EnrichmentConfigurations.TABLE_NAME)); } diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java index 5c0f2e0..d7f7b7f 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java @@ -32,7 +32,7 @@ import org.apache.metron.common.configuration.ConfigurationsUtils; import org.apache.metron.common.configuration.EnrichmentConfigurations; import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; import org.apache.metron.common.zookeeper.ConfigurationsCache; -import org.apache.metron.hbase.client.HBaseClient; +import org.apache.metron.hbase.client.LegacyHBaseClient; import org.apache.metron.rest.RestException; import org.apache.metron.rest.service.SensorEnrichmentConfigService; import org.apache.zookeeper.KeeperException; @@ -48,12 +48,12 @@ public class SensorEnrichmentConfigServiceImpl implements SensorEnrichmentConfig private ConfigurationsCache cache; - private HBaseClient hBaseClient; + private LegacyHBaseClient hBaseClient; @Autowired public SensorEnrichmentConfigServiceImpl(final ObjectMapper objectMapper, final CuratorFramework client, final ConfigurationsCache cache, - final HBaseClient hBaseClient) { + final LegacyHBaseClient hBaseClient) { this.objectMapper = objectMapper; this.client = client; this.cache = cache; diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java index d42f128..b3a478b 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java @@ -46,7 +46,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.metron.common.configuration.ConfigurationsUtils; import org.apache.metron.common.zookeeper.ConfigurationsCache; import org.apache.metron.common.zookeeper.ZKConfigurationsCache; -import org.apache.metron.hbase.client.HBaseClient; +import org.apache.metron.hbase.client.LegacyHBaseClient; import org.apache.metron.hbase.mock.MockHBaseTableProvider; import org.apache.metron.integration.ComponentRunner; import org.apache.metron.integration.UnableToStartException; @@ -202,7 +202,7 @@ public class TestConfig { } @Bean() - public HBaseClient hBaseClient() throws RestException, IOException { + public LegacyHBaseClient hBaseClient() throws RestException, IOException { final String cf = "t"; final String cq = "v"; HTableInterface table = MockHBaseTableProvider.addToCache("enrichment_list", cf); @@ -216,7 +216,7 @@ public class TestConfig { put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cq), "{}".getBytes(StandardCharsets.UTF_8)); table.put(put); } - return new HBaseClient(new MockHBaseTableProvider(), HBaseConfiguration.create(), + return new LegacyHBaseClient(new MockHBaseTableProvider(), HBaseConfiguration.create(), "enrichment_list"); } diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java index 3d07da5..4176121 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java @@ -42,7 +42,7 @@ import org.apache.metron.common.configuration.enrichment.EnrichmentConfig; import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; import org.apache.metron.common.configuration.enrichment.threatintel.ThreatIntelConfig; import org.apache.metron.common.zookeeper.ConfigurationsCache; -import org.apache.metron.hbase.client.HBaseClient; +import org.apache.metron.hbase.client.LegacyHBaseClient; import org.apache.metron.rest.RestException; import org.apache.metron.rest.service.SensorEnrichmentConfigService; import org.apache.zookeeper.KeeperException; @@ -82,14 +82,14 @@ public class SensorEnrichmentConfigServiceImplTest { public static String broJson; ConfigurationsCache cache; - private HBaseClient hBaseClient; + private LegacyHBaseClient hBaseClient; @Before public void setUp() throws Exception { objectMapper = mock(ObjectMapper.class); curatorFramework = mock(CuratorFramework.class); cache = mock(ConfigurationsCache.class); - hBaseClient = mock(HBaseClient.class); + hBaseClient = mock(LegacyHBaseClient.class); sensorEnrichmentConfigService = new SensorEnrichmentConfigServiceImpl(objectMapper, curatorFramework, cache, hBaseClient); } diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/pom.xml b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/pom.xml index 190bcb2..ce61df3 100644 --- a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/pom.xml +++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/pom.xml @@ -59,9 +59,23 @@ <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> + <exclusion> + <artifactId>commons-httpclient</artifactId> + <groupId>commons-httpclient</groupId> + </exclusion> </exclusions> </dependency> <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.4.1</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + <version>4.4.9</version> + </dependency> + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava_version}</version> @@ -178,6 +192,10 @@ <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> </exclusion> + <exclusion> + <artifactId>commons-httpclient</artifactId> + <groupId>commons-httpclient</groupId> + </exclusion> </exclusions> </dependency> <dependency> diff --git a/metron-platform/metron-hbase-server/src/main/java/org/apache/metron/hbase/coprocessor/HBaseCacheWriter.java b/metron-platform/metron-hbase-server/src/main/java/org/apache/metron/hbase/coprocessor/HBaseCacheWriter.java index b1bbdde..5d19960 100644 --- a/metron-platform/metron-hbase-server/src/main/java/org/apache/metron/hbase/coprocessor/HBaseCacheWriter.java +++ b/metron-platform/metron-hbase-server/src/main/java/org/apache/metron/hbase/coprocessor/HBaseCacheWriter.java @@ -26,7 +26,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.hadoop.conf.Configuration; import org.apache.metron.hbase.TableProvider; -import org.apache.metron.hbase.client.HBaseClient; +import org.apache.metron.hbase.client.LegacyHBaseClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +59,7 @@ public class HBaseCacheWriter implements CacheWriter<String, String> { @Override public void write(@Nonnull String key, @Nonnull String value) { LOG.debug("Calling hbase cache writer with key='{}', value='{}'", key, value); - try (HBaseClient hbClient = new HBaseClient(this.tableProvider, this.config, this.tableName)) { + try (LegacyHBaseClient hbClient = new LegacyHBaseClient(this.tableProvider, this.config, this.tableName)) { LOG.debug("rowKey={}, columnFamily={}, columnQualifier={}, value={}", key, columnFamily, columnQualifier, value); hbClient.put(key, columnFamily, columnQualifier, value); diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/ColumnList.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/ColumnList.java index 01a5cc6..33af434 100644 --- a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/ColumnList.java +++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/ColumnList.java @@ -20,8 +20,12 @@ package org.apache.metron.hbase; +import org.apache.hadoop.hbase.util.Bytes; + import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Objects; /** * Represents a list of HBase columns. @@ -76,6 +80,30 @@ public class ColumnList { public long getTs() { return ts; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Column)) return false; + Column column = (Column) o; + return ts == column.ts && + Arrays.equals(value, column.value); + } + + @Override + public int hashCode() { + int result = Objects.hash(ts); + result = 31 * result + Arrays.hashCode(value); + return result; + } + + @Override + public String toString() { + return "Column{" + + "value=" + Arrays.toString(value) + + ", ts=" + ts + + '}'; + } } public static class Counter extends AbstractColumn { @@ -88,6 +116,26 @@ public class ColumnList { public long getIncrement() { return incr; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Counter)) return false; + Counter counter = (Counter) o; + return incr == counter.incr; + } + + @Override + public int hashCode() { + return Objects.hash(incr); + } + + @Override + public String toString() { + return "Counter{" + + "incr=" + incr + + '}'; + } } private ArrayList<ColumnList.Column> columns; @@ -123,6 +171,24 @@ public class ColumnList { return this; } + public ColumnList addColumn(byte[] family, byte[] qualifier){ + addColumn(new Column(family, qualifier, -1, null)); + return this; + } + + public ColumnList addColumn(String family, String qualifier){ + addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); + return this; + } + + public ColumnList addColumn(String family, String qualifier, byte[] value){ + return addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), value); + } + + public ColumnList addColumn(String family, String qualifier, String value){ + return addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value)); + } + /** * Add a standard HBase column given an instance of a class that implements * the <code>IColumn</code> interface. @@ -131,6 +197,11 @@ public class ColumnList { return this.addColumn(column.family(), column.qualifier(), column.timestamp(), column.value()); } + public ColumnList addColumn(Column column){ + columns().add(column); + return this; + } + /** * Add an HBase counter column. */ @@ -152,14 +223,14 @@ public class ColumnList { * Query to determine if we have column definitions. */ public boolean hasColumns(){ - return this.columns != null; + return columns != null && columns.size() > 0; } /** * Query to determine if we have counter definitions. */ public boolean hasCounters(){ - return this.counters != null; + return this.counters != null && counters.size() > 0; } /** @@ -175,4 +246,26 @@ public class ColumnList { public List<Counter> getCounters(){ return this.counters; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ColumnList)) return false; + ColumnList that = (ColumnList) o; + return Objects.equals(columns, that.columns) && + Objects.equals(counters, that.counters); + } + + @Override + public int hashCode() { + return Objects.hash(columns, counters); + } + + @Override + public String toString() { + return "ColumnList{" + + "columns=" + columns + + ", counters=" + counters + + '}'; + } } \ No newline at end of file diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/HBaseProjectionCriteria.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/HBaseProjectionCriteria.java index 7432b02..6bacfb2 100644 --- a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/HBaseProjectionCriteria.java +++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/HBaseProjectionCriteria.java @@ -40,12 +40,16 @@ public class HBaseProjectionCriteria implements Serializable { public static class ColumnMetaData implements Serializable { - private byte[] columnFamily; + private byte[] columnFamily; private byte[] qualifier; public ColumnMetaData(String columnFamily, String qualifier) { - this.columnFamily = columnFamily.getBytes(); - this.qualifier = qualifier.getBytes(); + this(columnFamily.getBytes(), qualifier.getBytes()); + } + + public ColumnMetaData(byte[] columnFamily, byte[] qualifier) { + this.columnFamily = columnFamily; + this.qualifier = qualifier; } public byte[] getColumnFamily() { diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java index d0d934e..f51927a 100644 --- a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java +++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java @@ -17,321 +17,114 @@ * limitations under the License. * */ - package org.apache.metron.hbase.client; -import static org.apache.commons.collections4.CollectionUtils.size; - -import java.io.Closeable; -import java.io.IOException; -import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.List; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.metron.hbase.TableProvider; import org.apache.metron.hbase.ColumnList; import org.apache.metron.hbase.HBaseProjectionCriteria; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; /** * A client that interacts with HBase. */ -public class HBaseClient implements Closeable { - - private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); +public interface HBaseClient extends Closeable { /** - * The batch of queued Mutations. + * Enqueues a 'get' request that will be submitted when {@link #getAll()} is called. + * @param rowKey The row key to be retrieved. */ - List<Mutation> mutations; + void addGet(byte[] rowKey, HBaseProjectionCriteria criteria); /** - * The batch of queued Gets. + * Submits all pending get operations and returns the result of each. + * @return The result of each pending get request. */ - List<Get> gets; - - /** - * The HBase table this client interacts with. - */ - private HTableInterface table; - - public HBaseClient(TableProvider provider, final Configuration configuration, final String tableName) { - this.mutations = new ArrayList<>(); - this.gets = new ArrayList<>(); - try { - this.table = provider.getTable(configuration, tableName); - } catch (Exception e) { - String msg = String.format("Unable to open connection to HBase for table '%s'", tableName); - LOG.error(msg, e); - throw new RuntimeException(msg, e); - } - } + Result[] getAll(); /** - * Add a Mutation such as a Put or Increment to the batch. The Mutation is only queued for - * later execution. - * - * @param rowKey The row key of the Mutation. - * @param cols The columns affected by the Mutation. - * @param durability The durability of the mutation. + * Clears all pending get operations. */ - public void addMutation(byte[] rowKey, ColumnList cols, Durability durability) { - - if (cols.hasColumns()) { - Put put = createPut(rowKey, cols, durability); - mutations.add(put); - } - - if (cols.hasCounters()) { - Increment inc = createIncrement(rowKey, cols, durability); - mutations.add(inc); - } - - if (mutations.isEmpty()) { - mutations.add(new Put(rowKey)); - } - } + void clearGets(); /** - * Adds a Mutation such as a Put or Increment with a time to live. The Mutation is only queued - * for later execution. + * Scans an entire table returning all row keys as a List of Strings. * - * @param rowKey The row key of the Mutation. - * @param cols The columns affected by the Mutation. - * @param durability The durability of the mutation. - * @param timeToLiveMillis The time to live in milliseconds. - */ - public void addMutation(byte[] rowKey, ColumnList cols, Durability durability, Long timeToLiveMillis) { - - if (cols.hasColumns()) { - Put put = createPut(rowKey, cols, durability, timeToLiveMillis); - mutations.add(put); - } - - if (cols.hasCounters()) { - Increment inc = createIncrement(rowKey, cols, durability, timeToLiveMillis); - mutations.add(inc); - } - - if (mutations.isEmpty()) { - Put put = new Put(rowKey); - put.setTTL(timeToLiveMillis); - mutations.add(put); - } - } - - /** - * Remove all queued Mutations from the batch. - */ - public void clearMutations() { - mutations.clear(); - } - - /** - * Submits all queued Mutations. - * @return The number of mutation submitted. - */ - public int mutate() { - int mutationCount = mutations.size(); - Object[] result = new Object[mutationCount]; - try { - table.batch(mutations, result); - mutations.clear(); - - } catch (Exception e) { - String msg = String.format("'%d' HBase write(s) failed on table '%s'", size(mutations), tableName(table)); - LOG.error(msg, e); - throw new RuntimeException(msg, e); - } - - return mutationCount; - } - - /** - * Adds a Get to the batch. + * <p><b>**WARNING**:</b> Do not use this method unless you're absolutely crystal clear about the performance + * impact. Doing full table scans in HBase can adversely impact performance. * - * @param rowKey The row key of the Get - * @param criteria Defines the columns/families that will be retrieved. - */ - public void addGet(byte[] rowKey, HBaseProjectionCriteria criteria) { - Get get = new Get(rowKey); - - if (criteria != null) { - criteria.getColumnFamilies().forEach(cf -> get.addFamily(cf)); - criteria.getColumns().forEach(col -> get.addColumn(col.getColumnFamily(), col.getQualifier())); - } - - // queue the get - this.gets.add(get); - } - - /** - * Clears all queued Gets from the batch. + * @return List of all row keys as Strings for this table. */ - public void clearGets() { - gets.clear(); - } + List<String> scanRowKeys() throws IOException; /** - * Submit all queued Gets. + * Scans the table and returns each result. * - * @return The Result of each queued Get. - */ - public Result[] getAll() { - try { - Result[] results = table.get(gets); - gets.clear(); - return results; - - } catch (Exception e) { - String msg = String.format("'%d' HBase read(s) failed on table '%s'", size(gets), tableName(table)); - LOG.error(msg, e); - throw new RuntimeException(msg, e); - } - } - - /** - * Close the table. - */ - @Override - public void close() throws IOException { - if(table != null) { - table.close(); - } - } - - /** - * Creates an HBase Put. + * <p><b>**WARNING**:</b> Do not use this method unless you're absolutely crystal clear about the performance + * impact. Doing full table scans in HBase can adversely impact performance. * - * @param rowKey The row key. - * @param cols The columns to put. - * @param durability The durability of the put. + * @return The results from the scan. + * @throws IOException */ - private Put createPut(byte[] rowKey, ColumnList cols, Durability durability) { - Put put = new Put(rowKey); - put.setDurability(durability); - addColumns(cols, put); - return put; - } + Result[] scan(int numRows) throws IOException; /** - * Creates an HBase Put. + * Enqueues a {@link org.apache.hadoop.hbase.client.Mutation} such as a put or + * increment. The operation is enqueued for later execution. * - * @param rowKey The row key. - * @param cols The columns to put. - * @param durability The durability of the put. - * @param timeToLiveMillis The TTL in milliseconds. + * @param rowKey The row key of the Mutation. + * @param cols The columns affected by the Mutation. */ - private Put createPut(byte[] rowKey, ColumnList cols, Durability durability, long timeToLiveMillis) { - Put put = new Put(rowKey); - put.setDurability(durability); - put.setTTL(timeToLiveMillis); - addColumns(cols, put); - return put; - } + void addMutation(byte[] rowKey, ColumnList cols); /** - * Adds the columns to the Put + * Enqueues a {@link org.apache.hadoop.hbase.client.Mutation} such as a put or + * increment. The operation is enqueued for later execution. * - * @param cols The columns to add. - * @param put The Put. + * @param rowKey The row key of the Mutation. + * @param cols The columns affected by the Mutation. + * @param durability The durability of the mutation. */ - private void addColumns(ColumnList cols, Put put) { - for (ColumnList.Column col : cols.getColumns()) { - - if (col.getTs() > 0) { - put.add(col.getFamily(), col.getQualifier(), col.getTs(), col.getValue()); - - } else { - put.add(col.getFamily(), col.getQualifier(), col.getValue()); - } - } - } + void addMutation(byte[] rowKey, ColumnList cols, Durability durability); /** - * Creates an HBase Increment for a counter. + * Enqueues a {@link org.apache.hadoop.hbase.client.Mutation} such as a put or + * increment. The operation is enqueued for later execution. * - * @param rowKey The row key. - * @param cols The columns to include. - * @param durability The durability of the increment. + * @param rowKey The row key of the Mutation. + * @param cols The columns affected by the Mutation. + * @param durability The durability of the mutation. + * @param timeToLiveMillis The time to live in milliseconds. */ - private Increment createIncrement(byte[] rowKey, ColumnList cols, Durability durability) { - Increment inc = new Increment(rowKey); - inc.setDurability(durability); - cols.getCounters().forEach(cnt -> inc.addColumn(cnt.getFamily(), cnt.getQualifier(), cnt.getIncrement())); - return inc; - } + void addMutation(byte[] rowKey, ColumnList cols, Durability durability, Long timeToLiveMillis); /** - * Creates an HBase Increment for a counter. + * Ensures that all pending mutations have completed. * - * @param rowKey The row key. - * @param cols The columns to include. - * @param durability The durability of the increment. + * @return The number of operations completed. */ - private Increment createIncrement(byte[] rowKey, ColumnList cols, Durability durability, long timeToLiveMillis) { - Increment inc = new Increment(rowKey); - inc.setDurability(durability); - inc.setTTL(timeToLiveMillis); - cols.getCounters().forEach(cnt -> inc.addColumn(cnt.getFamily(), cnt.getQualifier(), cnt.getIncrement())); - return inc; - } + int mutate(); /** - * Returns the name of the HBase table. - * <p>Attempts to avoid any null pointers that might be encountered along the way. - * @param table The table to retrieve the name of. - * @return The name of the table + * Clears all pending mutations. */ - private static String tableName(HTableInterface table) { - String tableName = "null"; - if(table != null) { - if(table.getName() != null) { - tableName = table.getName().getNameAsString(); - } - } - return tableName; - } + void clearMutations(); /** - * Puts a record into the configured HBase table synchronously (not batched). + * Delete a record by row key. + * + * @param rowKey The row key to delete. */ - public void put(String rowKey, String columnFamily, String columnQualifier, String value) - throws IOException { - Put put = new Put(Bytes.toBytes(rowKey)); - put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier), - Bytes.toBytes(value)); - table.put(put); - } + void delete(byte[] rowKey); /** - * Scans an entire table returning all row keys as a List of Strings. + * Delete a column or set of columns by row key. * - * <p> - * <b>**WARNING**:</b> Do not use this method unless you're absolutely crystal clear about the performance - * impact. Doing full table scans in HBase can adversely impact performance. - * - * @return List of all row keys as Strings for this table. + * @param rowKey The row key to delete. + * @param columnList The set of columns to delete. */ - public List<String> readRecords() throws IOException { - Scan scan = new Scan(); - ResultScanner scanner = table.getScanner(scan); - List<String> rows = new ArrayList<>(); - for (Result r = scanner.next(); r != null; r = scanner.next()) { - rows.add(Bytes.toString(r.getRow())); - } - return rows; - } - + void delete(byte[] rowKey, ColumnList columnList); } diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClientFactory.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClientFactory.java new file mode 100644 index 0000000..b38b07b --- /dev/null +++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClientFactory.java @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.InvocationTargetException; +import java.util.function.Supplier; + +/** + * Responsible for creating an {@link HBaseTableClient}. + */ +public interface HBaseClientFactory extends Serializable { + Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * @param factory The connection factory for creating connections to HBase. + * @param configuration The HBase configuration. + * @param tableName The name of the HBase table. + * @return An {@link HBaseTableClient}. + */ + HBaseClient create(HBaseConnectionFactory factory, Configuration configuration, String tableName); + + /** + * Instantiates a new {@link HBaseClientFactory} by class name. + * + * @param className The class name of the {@link HBaseClientFactory} to instantiate. + * @param defaultImpl The default instance to instantiate if the className is invalid. + * @return A new {@link HBaseClientFactory}. + */ + static HBaseClientFactory byName(String className, Supplier<HBaseClientFactory> defaultImpl) { + LOG.debug("Creating HBase client creator; className={}", className); + + if(className == null || className.length() == 0 || className.charAt(0) == '$') { + LOG.debug("Using default hbase client creator"); + return defaultImpl.get(); + + } else { + try { + Class<? extends HBaseClientFactory> clazz = (Class<? extends HBaseClientFactory>) Class.forName(className); + return clazz.getConstructor().newInstance(); + + } catch(InstantiationException | IllegalAccessException | InvocationTargetException | + NoSuchMethodException | ClassNotFoundException e) { + throw new IllegalStateException("Unable to instantiate connector.", e); + } + } + } +} diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseConnectionFactory.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseConnectionFactory.java new file mode 100644 index 0000000..bda26c5 --- /dev/null +++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseConnectionFactory.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.lang.invoke.MethodHandles; +import java.lang.reflect.InvocationTargetException; + +/** + * Establishes a {@link Connection} to HBase. + */ +public class HBaseConnectionFactory implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public Connection createConnection(Configuration configuration) throws IOException { + return ConnectionFactory.createConnection(configuration); + } + + /** + * Creates an {@link HBaseConnectionFactory} based on a fully-qualified class name. + * + * @param className The fully-qualified class name to instantiate. + * @return A {@link HBaseConnectionFactory}. + */ + public static HBaseConnectionFactory byName(String className) { + LOG.debug("Creating HBase connection factory; className={}", className); + try { + Class<? extends HBaseConnectionFactory> clazz = (Class<? extends HBaseConnectionFactory>) Class.forName(className); + return clazz.getConstructor().newInstance(); + + } catch (InstantiationException | NoSuchMethodException | IllegalAccessException | ClassNotFoundException | InvocationTargetException e) { + throw new IllegalStateException("Unable to instantiate HBaseConnectionFactory.", e); + } + } +} diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClient.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClient.java new file mode 100644 index 0000000..60d2328 --- /dev/null +++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClient.java @@ -0,0 +1,285 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.metron.hbase.ColumnList; +import org.apache.metron.hbase.HBaseProjectionCriteria; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.commons.collections4.CollectionUtils.size; + +/** + * An {@link HBaseClient} that uses the {@link Table} API to interact with HBase. + */ +public class HBaseTableClient implements HBaseClient { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private List<Mutation> mutations; + private List<Get> gets; + private Connection connection; + private Table table; + + /** + * @param connectionFactory Creates connections to HBase. + * @param configuration The HBase configuration. + * @param tableName The name of the HBase table. + */ + public HBaseTableClient(HBaseConnectionFactory connectionFactory, Configuration configuration, String tableName) throws IOException { + gets = new ArrayList<>(); + mutations = new ArrayList<>(); + connection = connectionFactory.createConnection(configuration); + table = connection.getTable(TableName.valueOf(tableName)); + } + + @Override + public void close() { + try { + if(table != null) { + table.close(); + } + } catch(IOException e) { + LOG.error("Error while closing HBase table", e); + } + + try { + if(connection != null) { + connection.close(); + } + } catch(IOException e) { + LOG.error("Error while closing HBase connection",e); + } + } + + @Override + public void addGet(byte[] rowKey, HBaseProjectionCriteria criteria) { + Get get = new Get(rowKey); + + // define which column families and columns are needed + if (criteria != null) { + criteria.getColumnFamilies().forEach(cf -> get.addFamily(cf)); + criteria.getColumns().forEach(col -> get.addColumn(col.getColumnFamily(), col.getQualifier())); + } + + // queue the get + this.gets.add(get); + } + + @Override + public Result[] getAll() { + try { + return table.get(gets); + + } catch (Exception e) { + String msg = String.format("'%d' HBase read(s) failed on table '%s'", size(gets), tableName(table)); + LOG.error(msg, e); + throw new RuntimeException(msg, e); + + } finally { + gets.clear(); + } + } + + @Override + public void clearGets() { + gets.clear(); + } + + @Override + public List<String> scanRowKeys() throws IOException { + List<String> rowKeys = new ArrayList<>(); + ResultScanner scanner = getScanner(); + for (Result r = scanner.next(); r != null; r = scanner.next()) { + String rowKeyAsString = Bytes.toString(r.getRow()); + rowKeys.add(rowKeyAsString); + } + return rowKeys; + } + + @Override + public Result[] scan(int numRows) throws IOException { + return getScanner().next(numRows); + } + + private ResultScanner getScanner() throws IOException { + Scan scan = new Scan(); + return table.getScanner(scan); + } + + /** + * Returns the name of the HBase table. + * <p>Attempts to avoid any null pointers that might be encountered along the way. + * @param table The table to retrieve the name of. + * @return The name of the table + */ + private static String tableName(Table table) { + String tableName = "null"; + if(table != null) { + if(table.getName() != null) { + tableName = table.getName().getNameAsString(); + } + } + return tableName; + } + + @Override + public void addMutation(byte[] rowKey, ColumnList cols) { + HBaseWriterParams params = new HBaseWriterParams(); + addMutation(rowKey, cols, params); + } + + @Override + public void addMutation(byte[] rowKey, ColumnList cols, Durability durability) { + HBaseWriterParams params = new HBaseWriterParams() + .withDurability(durability); + addMutation(rowKey, cols, params); + } + + @Override + public void addMutation(byte[] rowKey, ColumnList cols, Durability durability, Long timeToLiveMillis) { + HBaseWriterParams params = new HBaseWriterParams() + .withDurability(durability) + .withTimeToLive(timeToLiveMillis); + addMutation(rowKey, cols, params); + } + + private void addMutation(byte[] rowKey, ColumnList cols, HBaseWriterParams params) { + if (cols.hasColumns()) { + Put put = createPut(rowKey, params); + addColumns(cols, put); + mutations.add(put); + } + if (cols.hasCounters()) { + Increment inc = createIncrement(rowKey, params); + addColumns(cols, inc); + mutations.add(inc); + } + } + + @Override + public void clearMutations() { + mutations.clear(); + } + + @Override + public int mutate() { + int mutationCount = mutations.size(); + if(mutationCount > 0) { + doMutate(); + } + + return mutationCount; + } + + @Override + public void delete(byte[] rowKey) { + try { + Delete delete = new Delete(rowKey); + table.delete(delete); + + } catch (Exception e) { + String msg = String.format("Unable to delete; table=%s", tableName(table)); + LOG.error(msg, e); + throw new RuntimeException(msg, e); + } + } + + @Override + public void delete(byte[] rowKey, ColumnList columnList) { + try { + Delete delete = new Delete(rowKey); + for(ColumnList.Column column: columnList.getColumns()) { + delete.addColumn(column.getFamily(), column.getQualifier()); + } + table.delete(delete); + + } catch (Exception e) { + String msg = String.format("Unable to delete; table=%s", tableName(table)); + LOG.error(msg, e); + throw new RuntimeException(msg, e); + } + } + + private void doMutate() { + Object[] result = new Object[mutations.size()]; + try { + table.batch(mutations, result); + + } catch (Exception e) { + String msg = String.format("'%d' HBase write(s) failed on table '%s'", size(mutations), tableName(table)); + LOG.error(msg, e); + throw new RuntimeException(msg, e); + + } finally { + mutations.clear(); + } + } + + private Put createPut(byte[] rowKey, HBaseWriterParams params) { + Put put = new Put(rowKey); + if(params.getTimeToLiveMillis() > 0) { + put.setTTL(params.getTimeToLiveMillis()); + } + put.setDurability(params.getDurability()); + return put; + } + + private void addColumns(ColumnList cols, Put put) { + for (ColumnList.Column col: cols.getColumns()) { + if (col.getTs() > 0) { + put.addColumn(col.getFamily(), col.getQualifier(), col.getTs(), col.getValue()); + } else { + put.addColumn(col.getFamily(), col.getQualifier(), col.getValue()); + } + } + } + + private void addColumns(ColumnList cols, Increment inc) { + cols.getCounters().forEach(cnt -> + inc.addColumn(cnt.getFamily(), cnt.getQualifier(), cnt.getIncrement())); + } + + private Increment createIncrement(byte[] rowKey, HBaseWriterParams params) { + Increment inc = new Increment(rowKey); + if(params.getTimeToLiveMillis() > 0) { + inc.setTTL(params.getTimeToLiveMillis()); + } + inc.setDurability(params.getDurability()); + return inc; + } +} diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClientFactory.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClientFactory.java new file mode 100644 index 0000000..8697e72 --- /dev/null +++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClientFactory.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; + +/** + * Creates an {@link HBaseTableClient}. + */ +public class HBaseTableClientFactory implements HBaseClientFactory { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * @param factory The factory that creates connections to HBase. + * @param configuration The HBase configuration. + * @param tableName The name of the HBase table. + * @return An {@link HBaseTableClient} that behaves synchronously. + */ + @Override + public HBaseClient create(HBaseConnectionFactory factory, + Configuration configuration, + String tableName) { + try { + LOG.debug("Creating HBase client; table={}", tableName); + return new HBaseTableClient(factory, configuration, tableName); + + } catch (Exception e) { + String msg = String.format("Unable to open connection to HBase for table '%s'", tableName); + LOG.error(msg, e); + throw new RuntimeException(msg, e); + } + } +} diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseWriterParams.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseWriterParams.java new file mode 100644 index 0000000..ec93177 --- /dev/null +++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseWriterParams.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.hbase.client; + +import org.apache.hadoop.hbase.client.Durability; + +/** + * Parameters that define how the {@link HBaseWriter} writes to HBase. + */ +public class HBaseWriterParams { + private Durability durability; + private Long timeToLiveMillis; + + public HBaseWriterParams() { + durability = Durability.USE_DEFAULT; + timeToLiveMillis = 0L; + } + + public HBaseWriterParams withDurability(Durability durability) { + this.durability = durability; + return this; + } + + public HBaseWriterParams withTimeToLive(Long timeToLiveMillis) { + this.timeToLiveMillis = timeToLiveMillis; + return this; + } + + public Durability getDurability() { + return durability; + } + + public Long getTimeToLiveMillis() { + return timeToLiveMillis; + } +} diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/LegacyHBaseClient.java similarity index 98% copy from metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java copy to metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/LegacyHBaseClient.java index d0d934e..d3f22c0 100644 --- a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java +++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/LegacyHBaseClient.java @@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory; /** * A client that interacts with HBase. */ -public class HBaseClient implements Closeable { +public class LegacyHBaseClient implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -66,7 +66,7 @@ public class HBaseClient implements Closeable { */ private HTableInterface table; - public HBaseClient(TableProvider provider, final Configuration configuration, final String tableName) { + public LegacyHBaseClient(TableProvider provider, final Configuration configuration, final String tableName) { this.mutations = new ArrayList<>(); this.gets = new ArrayList<>(); try { diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java index 1983fc7..f9a9f1f 100644 --- a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java +++ b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java @@ -63,7 +63,7 @@ public class HBaseClientTest { private static final String tableName = "table"; private static HBaseTestingUtility util; - private static HBaseClient client; + private static LegacyHBaseClient client; private static HTableInterface table; private static Admin admin; private static byte[] cf = Bytes.toBytes("cf"); @@ -87,7 +87,7 @@ public class HBaseClientTest { table = util.createTable(Bytes.toBytes(tableName), cf); util.waitTableEnabled(table.getName()); // setup the client - client = new HBaseClient((c,t) -> table, table.getConfiguration(), tableName); + client = new LegacyHBaseClient((c, t) -> table, table.getConfiguration(), tableName); } @AfterClass @@ -259,7 +259,7 @@ public class HBaseClientTest { TableProvider tableProvider = mock(TableProvider.class); when(tableProvider.getTable(any(), any())).thenThrow(new IllegalArgumentException("test exception")); - client = new HBaseClient(tableProvider, HBaseConfiguration.create(), tableName); + client = new LegacyHBaseClient(tableProvider, HBaseConfiguration.create(), tableName); } @Test(expected = RuntimeException.class) @@ -271,7 +271,7 @@ public class HBaseClientTest { TableProvider tableProvider = mock(TableProvider.class); when(tableProvider.getTable(any(), any())).thenReturn(table); - client = new HBaseClient(tableProvider, HBaseConfiguration.create(), tableName); + client = new LegacyHBaseClient(tableProvider, HBaseConfiguration.create(), tableName); client.addMutation(rowKey1, cols1, Durability.SYNC_WAL); client.mutate(); } @@ -288,7 +288,7 @@ public class HBaseClientTest { HBaseProjectionCriteria criteria = new HBaseProjectionCriteria(); criteria.addColumnFamily(Bytes.toString(cf)); - client = new HBaseClient(tableProvider, HBaseConfiguration.create(), tableName); + client = new LegacyHBaseClient(tableProvider, HBaseConfiguration.create(), tableName); client.addGet(rowKey1, criteria); client.addGet(rowKey2, criteria); client.getAll(); diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/integration/HBaseTableClientIntegrationTest.java b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/integration/HBaseTableClientIntegrationTest.java new file mode 100644 index 0000000..d9db3cf --- /dev/null +++ b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/integration/HBaseTableClientIntegrationTest.java @@ -0,0 +1,286 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.hbase.client.integration; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.metron.hbase.ColumnList; +import org.apache.metron.hbase.HBaseProjectionCriteria; +import org.apache.metron.hbase.client.HBaseConnectionFactory; +import org.apache.metron.hbase.client.HBaseTableClient; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * An integration test for the {@link HBaseTableClient}. + */ +public class HBaseTableClientIntegrationTest { + private static final String tableName = "widgets"; + private static final String columnFamily = "W"; + private static final byte[] columnFamilyB = Bytes.toBytes(columnFamily); + private static final String columnQualifier = "column"; + private static final byte[] columnQualifierB = Bytes.toBytes(columnQualifier); + private static final String rowKey1String = "row-key-1"; + private static final byte[] rowKey1 = Bytes.toBytes(rowKey1String); + private static final String rowKey2String = "row-key-2"; + private static final byte[] rowKey2 = Bytes.toBytes(rowKey2String); + private static HBaseTestingUtility util; + private static Table table; + private HBaseTableClient client; + + @BeforeClass + public static void startHBase() throws Exception { + Configuration config = HBaseConfiguration.create(); + config.set("hbase.master.hostname", "localhost"); + config.set("hbase.regionserver.hostname", "localhost"); + + util = new HBaseTestingUtility(config); + util.startMiniCluster(); + + // create the table + table = util.createTable(TableName.valueOf(tableName), columnFamily); + util.waitTableEnabled(table.getName()); + } + + @AfterClass + public static void stopHBase() throws Exception { + util.deleteTable(table.getName()); + util.shutdownMiniCluster(); + util.cleanupTestDir(); + } + + @Before + public void setup() throws IOException { + client = new HBaseTableClient(new HBaseConnectionFactory(), util.getConfiguration(), tableName); + } + + @After + public void tearDown() throws Exception { + // delete all records in the table + List<Delete> deletions = new ArrayList<>(); + for(Result r : table.getScanner(new Scan())) { + deletions.add(new Delete(r.getRow())); + } + table.delete(deletions); + + if(client != null) { + client.close(); + } + } + + @Test + public void testMutate() throws Exception { + // write some values + ColumnList columns = new ColumnList() + .addColumn(columnFamily, columnQualifier, "value1"); + client.addMutation(rowKey1, columns, Durability.SKIP_WAL); + client.mutate(); + + // read back the value + client.addGet(rowKey1, new HBaseProjectionCriteria().addColumnFamily(columnFamily)); + Result[] results = client.getAll(); + + // validate + assertEquals(1, results.length); + assertEquals("value1", getValue(results[0], columnFamily, columnQualifier)); + } + + @Test + public void testMutateMultipleColumns() throws Exception { + // write some values + ColumnList columns = new ColumnList() + .addColumn(columnFamily, "col1", "value1") + .addColumn(columnFamily, "col2", "value2"); + client.addMutation(rowKey1, columns, Durability.SKIP_WAL); + client.mutate(); + + // read back the value + client.addGet(rowKey1, new HBaseProjectionCriteria().addColumnFamily(columnFamily)); + Result[] results = client.getAll(); + + // validate + assertEquals(1, results.length); + assertEquals("value1", getValue(results[0], columnFamily, "col1")); + assertEquals("value2", getValue(results[0], columnFamily, "col2")); + } + + @Test + public void testNoMutations() throws Exception { + // do not add any mutations before attempting to write + int count = client.mutate(); + Assert.assertEquals(0, count); + + // attempt to read + HBaseProjectionCriteria criteria = new HBaseProjectionCriteria().addColumnFamily(columnFamily); + client.addGet(rowKey1, criteria); + client.addGet(rowKey2, criteria); + Result[] results = client.getAll(); + + // nothing should have been read + assertEquals(2, results.length); + for(Result result : results) { + Assert.assertTrue(result.isEmpty()); + } + } + + @Test + public void testScan() throws Exception { + // write some values + client.addMutation(rowKey1, new ColumnList().addColumn(columnFamily, columnQualifier, "value1"), Durability.SKIP_WAL); + client.mutate(); + + // scan the table + Result[] results = client.scan(10); + assertEquals(1, results.length); + + assertArrayEquals(rowKey1, results[0].getRow()); + String actual1 = Bytes.toString(results[0].getValue(columnFamilyB, columnQualifierB)); + assertEquals("value1", actual1); + } + + @Test + public void testScanLimit() throws Exception { + // write some values + client.addMutation(rowKey1, new ColumnList().addColumn(columnFamily, columnQualifier, "value1"), Durability.SKIP_WAL); + client.addMutation(rowKey2, new ColumnList().addColumn(columnFamily, columnQualifier, "value2"), Durability.SKIP_WAL); + client.mutate(); + + // scan the table, but limit to 1 result + Result[] results = client.scan(1); + assertEquals(1, results.length); + } + + @Test + public void testScanNothing() throws Exception { + // scan the table, but there is nothing there + Result[] results = client.scan(1); + assertEquals(0, results.length); + } + + @Test + public void testScanRowKeys() throws Exception { + // write some values + client.addMutation(rowKey1, new ColumnList().addColumn(columnFamily, columnQualifier, "value1"), Durability.SKIP_WAL); + client.addMutation(rowKey2, new ColumnList().addColumn(columnFamily, columnQualifier, "value2"), Durability.SKIP_WAL); + client.mutate(); + + // scan the table + List<String> rowKeys = client.scanRowKeys(); + List<String> expected = Arrays.asList(rowKey1String, rowKey2String); + assertEquals(new HashSet<>(expected), new HashSet<>(rowKeys)); + } + + @Test + public void testDelete() { + // write some values + client.addMutation(rowKey1, new ColumnList().addColumn(columnFamily, columnQualifier, "value1"), Durability.SKIP_WAL); + client.addMutation(rowKey2, new ColumnList().addColumn(columnFamily, columnQualifier, "value2"), Durability.SKIP_WAL); + client.mutate(); + + client.delete(rowKey1); + + // the deleted row key should no longer exist + client.addGet(rowKey1, new HBaseProjectionCriteria().addColumnFamily(columnFamily)); + Assert.assertTrue(client.getAll()[0].isEmpty()); + + // the other row key should remain + client.addGet(rowKey2, new HBaseProjectionCriteria().addColumnFamily(columnFamily)); + Assert.assertFalse(client.getAll()[0].isEmpty()); + } + + @Test + public void testDeleteNothing() { + // nothing should blow-up if we attempt to delete something that does not exist + client.delete(rowKey1); + } + + @Test + public void testDeleteColumn() { + // write some values + ColumnList columns = new ColumnList() + .addColumn(columnFamily, "col1", "value1") + .addColumn(columnFamily, "col2", "value2"); + client.addMutation(rowKey1, columns, Durability.SKIP_WAL); + client.mutate(); + + // delete a column + client.delete(rowKey1, new ColumnList().addColumn(columnFamily, "col1")); + + // read back the value + client.addGet(rowKey1, new HBaseProjectionCriteria().addColumnFamily(columnFamily)); + Result[] results = client.getAll(); + + // validate + assertEquals(1, results.length); + assertNull(getValue(results[0], columnFamily, "col1")); + assertEquals("value2", getValue(results[0], columnFamily, "col2")); + } + + @Test + public void testDeleteAllColumns() { + // write some values + ColumnList columns = new ColumnList() + .addColumn(columnFamily, "col1", "value1") + .addColumn(columnFamily, "col2", "value2"); + client.addMutation(rowKey1, columns, Durability.SKIP_WAL); + client.mutate(); + + // delete both columns individually + client.delete(rowKey1, new ColumnList().addColumn(columnFamily, "col1")); + client.delete(rowKey1, new ColumnList().addColumn(columnFamily, "col2")); + + // read back the value + client.addGet(rowKey1, new HBaseProjectionCriteria().addColumnFamily(columnFamily)); + Result[] results = client.getAll(); + + // validate + assertEquals(1, results.length); + assertNull(getValue(results[0], columnFamily, "col1")); + assertNull(getValue(results[0], columnFamily, "col2")); + } + + private String getValue(Result result, String columnFamily, String columnQualifier) { + byte[] value = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier)); + return Bytes.toString(value); + } +} diff --git a/metron-platform/metron-pcap/pom.xml b/metron-platform/metron-pcap/pom.xml index c839285..ffa7f3d 100644 --- a/metron-platform/metron-pcap/pom.xml +++ b/metron-platform/metron-pcap/pom.xml @@ -194,5 +194,10 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>${global_httpclient_version}</version> + </dependency> </dependencies> </project> diff --git a/pom.xml b/pom.xml index 66a935e..f8c56ac 100644 --- a/pom.xml +++ b/pom.xml @@ -91,7 +91,6 @@ <base_flux_version>1.0.1</base_flux_version> <base_kafka_version>0.10.0</base_kafka_version> <base_hadoop_version>2.7.1</base_hadoop_version> - <base_hbase_version>1.1.1</base_hbase_version> <base_flume_version>1.5.2</base_flume_version> <!-- full dependency versions --> <global_accumulo_version>1.8.0</global_accumulo_version> @@ -106,14 +105,12 @@ <global_pcap_version>1.7.1</global_pcap_version> <global_kafka_version>0.10.0.1</global_kafka_version> <global_hadoop_version>${base_hadoop_version}</global_hadoop_version> - <global_hbase_version>${base_hbase_version}</global_hbase_version> <global_flume_version>${base_flume_version}</global_flume_version> <global_elasticsearch_version>5.6.14</global_elasticsearch_version> <global_json_simple_version>1.1.1</global_json_simple_version> <global_metrics_version>3.0.2</global_metrics_version> <global_junit_version>4.12</global_junit_version> <global_guava_version>17.0</global_guava_version> - <global_hbase_guava_version>12.0</global_hbase_guava_version> <global_json_schema_validator_version>2.2.5</global_json_schema_validator_version> <global_slf4j_version>1.7.7</global_slf4j_version> <global_opencsv_version>3.7</global_opencsv_version> @@ -139,9 +136,27 @@ <global_jacoco_version>0.8.3</global_jacoco_version> <argLine></argLine> </properties> - <profiles> <profile> + <id>HDP-3.1</id> + <properties> + <hdp_version>3.1.0.0</hdp_version> + <global_hbase_version>2.0.2</global_hbase_version> + <global_hbase_guava_version>17.0</global_hbase_guava_version> + </properties> + </profile> + <profile> + <id>HDP-2.6</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <properties> + <hdp_version>2.6.5.0</hdp_version> + <global_hbase_version>1.1.1</global_hbase_version> + <global_hbase_guava_version>12.0</global_hbase_guava_version> + </properties> + </profile> + <profile> <id>HDP-2.5.0.0</id> <properties> <hdp_version>2.5.0.0</hdp_version> @@ -149,10 +164,11 @@ <global_storm_kafka_version>1.2.2</global_storm_kafka_version> <global_storm_version>${base_storm_version}.${hdp_version}-${build_number}</global_storm_version> <global_kafka_version>${base_kafka_version}.${hdp_version}-${build_number}</global_kafka_version> + <global_hbase_version>1.1.1</global_hbase_version> + <global_hbase_guava_version>12.0</global_hbase_guava_version> </properties> </profile> </profiles> - <dependencyManagement> <dependencies> <dependency>