This is an automated email from the ASF dual-hosted git repository. srdo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
commit 41fe57a9a65f8b0007d327f01442ecb8d0982521 Author: Stig Rohde Døssing <[email protected]> AuthorDate: Wed Feb 27 13:26:06 2019 +0100 STORM-3349: Upgrade Hadoop, Hive, HDFS, HBase to latest compatible versions --- .../storm/hbase/topology/WordCountClient.java | 6 ++- examples/storm-jms-examples/pom.xml | 2 +- external/storm-autocreds/pom.xml | 16 ++++++ .../org/apache/storm/hbase/common/HBaseClient.java | 14 ++--- .../java/org/apache/storm/hbase/common/Utils.java | 12 +++-- .../storm/hbase/trident/state/HBaseMapState.java | 6 +-- .../hbase/trident/windowing/HBaseWindowsStore.java | 21 ++++---- external/storm-hive/pom.xml | 1 + .../storm/flux/examples/WordCountClient.java | 6 ++- pom.xml | 63 ++++++++++++++++++++-- .../jvm/org/apache/storm/utils/JCQueueTest.java | 39 +++++++------- 11 files changed, 133 insertions(+), 53 deletions(-) diff --git a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java index b7874bd..33acfab 100644 --- a/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java +++ b/examples/storm-hbase-examples/src/main/java/org/apache/storm/hbase/topology/WordCountClient.java @@ -14,9 +14,11 @@ package org.apache.storm.hbase.topology; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; /** @@ -32,7 +34,7 @@ public class WordCountClient { config.set("hbase.rootdir", args[0]); } - HTable table = new HTable(config, "WordCount"); + Table table = ConnectionFactory.createConnection(config).getTable(TableName.valueOf("WordCount")); for (String word : WordSpout.words) { diff --git a/examples/storm-jms-examples/pom.xml b/examples/storm-jms-examples/pom.xml index 10266c1..886066c 100644 --- a/examples/storm-jms-examples/pom.xml +++ b/examples/storm-jms-examples/pom.xml @@ -29,7 +29,7 @@ <artifactId>storm-jms-examples</artifactId> <properties> - <spring.version>5.0.4.RELEASE</spring.version> + <spring.version>5.1.5.RELEASE</spring.version> </properties> <dependencies> <dependency> diff --git a/external/storm-autocreds/pom.xml b/external/storm-autocreds/pom.xml index 6c69740..d819145 100644 --- a/external/storm-autocreds/pom.xml +++ b/external/storm-autocreds/pom.xml @@ -108,6 +108,22 @@ </exclusions> </dependency> <dependency> + <!-- Needed for TokenUtil, which moved here from hbase-client --> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${hbase.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.apache.hive.hcatalog</groupId> <artifactId>hive-hcatalog-streaming</artifactId> <version>${hive.version}</version> diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java index 208e622..2d0c324 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java @@ -22,13 +22,13 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria; import org.apache.storm.hbase.security.HBaseSecurityUtil; @@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory; public class HBaseClient implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(HBaseClient.class); - private HTable table; + private Table table; public HBaseClient(Map<String, Object> map, final Configuration configuration, final String tableName) { try { @@ -60,14 +60,14 @@ public class HBaseClient implements Closeable { put.setDurability(durability); for (ColumnList.Column col : cols.getColumns()) { if (col.getTs() > 0) { - put.add( + put.addColumn( col.getFamily(), col.getQualifier(), col.getTs(), col.getValue() ); } else { - put.add( + put.addColumn( col.getFamily(), col.getQualifier(), col.getValue() @@ -82,9 +82,9 @@ public class HBaseClient implements Closeable { inc.setDurability(durability); for (ColumnList.Counter cnt : cols.getCounters()) { inc.addColumn( - cnt.getFamily(), - cnt.getQualifier(), - cnt.getIncrement() + cnt.getFamily(), + cnt.getQualifier(), + cnt.getIncrement() ); } mutations.add(inc); diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java index f9e6e34..defa092 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/Utils.java @@ -16,7 +16,9 @@ import java.io.IOException; import java.math.BigDecimal; import java.security.PrivilegedExceptionAction; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.security.UserGroupInformation; @@ -31,7 +33,7 @@ public class Utils { private Utils() {} - public static HTable getTable(UserProvider provider, Configuration config, String tableName) + public static Table getTable(UserProvider provider, Configuration config, String tableName) throws IOException, InterruptedException { UserGroupInformation ugi; if (provider != null) { @@ -74,10 +76,10 @@ public class Utils { } } - return ugi.doAs(new PrivilegedExceptionAction<HTable>() { + return ugi.doAs(new PrivilegedExceptionAction<Table>() { @Override - public HTable run() throws IOException { - return new HTable(config, tableName); + public Table run() throws IOException { + return ConnectionFactory.createConnection(config).getTable(TableName.valueOf(tableName)); } }); } diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java index d2a73d4..63169fb 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseMapState.java @@ -28,10 +28,10 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.storm.hbase.common.Utils; import org.apache.storm.hbase.security.HBaseSecurityUtil; @@ -72,7 +72,7 @@ public class HBaseMapState<T> implements IBackingMap<T> { private int partitionNum; private Options<T> options; private Serializer<T> serializer; - private HTable table; + private Table table; /** * Constructor. @@ -183,7 +183,7 @@ public class HBaseMapState<T> implements IBackingMap<T> { new Object[]{ this.partitionNum, new String(hbaseKey), new String(this.serializer.serialize(values.get(i))) }); Put put = new Put(hbaseKey); T val = values.get(i); - put.add(this.options.columnFamily.getBytes(), + put.addColumn(this.options.columnFamily.getBytes(), qualifier.getBytes(), this.serializer.serialize(val)); diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java index 086b477..3fe4bc4 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java @@ -23,12 +23,15 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.storm.trident.windowing.WindowKryoSerializer; import org.apache.storm.trident.windowing.WindowsStore; @@ -41,9 +44,9 @@ import org.slf4j.LoggerFactory; public class HBaseWindowsStore implements WindowsStore { public static final String UTF_8 = "utf-8"; private static final Logger LOG = LoggerFactory.getLogger(HBaseWindowsStore.class); - private final ThreadLocal<HTable> threadLocalHtable; + private final ThreadLocal<Table> threadLocalHtable; private final ThreadLocal<WindowKryoSerializer> threadLocalWindowKryoSerializer; - private final Queue<HTable> htables = new ConcurrentLinkedQueue<>(); + private final Queue<Table> htables = new ConcurrentLinkedQueue<>(); private final byte[] family; private final byte[] qualifier; @@ -52,13 +55,13 @@ public class HBaseWindowsStore implements WindowsStore { this.family = family; this.qualifier = qualifier; - threadLocalHtable = new ThreadLocal<HTable>() { + threadLocalHtable = new ThreadLocal<Table>() { @Override - protected HTable initialValue() { + protected Table initialValue() { try { - HTable hTable = new HTable(config, tableName); - htables.add(hTable); - return hTable; + Table table = ConnectionFactory.createConnection(config).getTable(TableName.valueOf(tableName)); + htables.add(table); + return table; } catch (IOException e) { throw new RuntimeException(e); } @@ -74,7 +77,7 @@ public class HBaseWindowsStore implements WindowsStore { } - private HTable htable() { + private Table htable() { return threadLocalHtable.get(); } @@ -252,7 +255,7 @@ public class HBaseWindowsStore implements WindowsStore { @Override public void shutdown() { // close all the created hTable instances - for (HTable htable : htables) { + for (Table htable : htables) { try { htable.close(); } catch (IOException e) { diff --git a/external/storm-hive/pom.xml b/external/storm-hive/pom.xml index 7723916..2fd8ae5 100644 --- a/external/storm-hive/pom.xml +++ b/external/storm-hive/pom.xml @@ -154,6 +154,7 @@ <artifactId>java-hamcrest</artifactId> </dependency> <dependency> + <!-- Needed in this version by hive --> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> <version>0.9.3</version> diff --git a/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java b/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java index 6e732ae..484d3c1 100644 --- a/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java +++ b/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/WordCountClient.java @@ -23,9 +23,11 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; /** @@ -61,7 +63,7 @@ public class WordCountClient { System.exit(1); } - HTable table = new HTable(config, "WordCount"); + Table table = ConnectionFactory.createConnection(config).getTable(TableName.valueOf("WordCount")); String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; for (String word : words) { diff --git a/pom.xml b/pom.xml index b8a8740..f359fc5 100644 --- a/pom.xml +++ b/pom.xml @@ -287,10 +287,10 @@ <mockito.version>2.19.0</mockito.version> <zookeeper.version>3.4.6</zookeeper.version> <jline.version>0.9.94</jline.version> - <hive.version>2.3.3</hive.version> - <hadoop.version>2.6.1</hadoop.version> + <hive.version>2.3.4</hive.version> + <hadoop.version>2.8.5</hadoop.version> <hdfs.version>${hadoop.version}</hdfs.version> - <hbase.version>1.4.4</hbase.version> + <hbase.version>2.1.3</hbase.version> <kryo.version>3.0.3</kryo.version> <servlet.version>3.1.0</servlet.version> <joda-time.version>2.3</joda-time.version> @@ -301,7 +301,6 @@ <hdrhistogram.version>2.1.10</hdrhistogram.version> <hamcrest.version>2.0.0.0</hamcrest.version> <cassandra.version>2.1.7</cassandra.version> - <druid.version>0.8.2</druid.version> <elasticsearch.version>5.2.2</elasticsearch.version> <calcite.version>1.14.0</calcite.version> <mongodb.version>3.2.0</mongodb.version> @@ -1084,6 +1083,62 @@ </exclusion> </exclusions> </dependency> + <!-- Hadoop dependencies. Specified here so HDFS/HBase don't import older versions of these jars in their projects--> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-resourcemanager</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-web-proxy</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-registry</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-archives</artifactId> + <version>${hadoop.version}</version> + </dependency> </dependencies> </dependencyManagement> diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java index aa7714d..896f0c2 100644 --- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java +++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java @@ -20,6 +20,7 @@ import org.apache.storm.policy.IWaitStrategy; import org.apache.storm.policy.WaitStrategyPark; import org.junit.Assert; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; public class JCQueueTest { @@ -31,34 +32,32 @@ public class JCQueueTest { @Test public void testFirstMessageFirst() throws InterruptedException { Assertions.assertTimeoutPreemptively(Duration.ofSeconds(10), () -> { - for (int i = 0; i < 100; i++) { - JCQueue queue = createQueue("firstMessageOrder", 16); + JCQueue queue = createQueue("firstMessageOrder", 16); - queue.publish("FIRST"); + queue.publish("FIRST"); - Runnable producer = new IncProducer(queue, i + 100, 1); + Runnable producer = new IncProducer(queue, 100, 1); - final AtomicReference<Object> result = new AtomicReference<>(); - Runnable consumer = new ConsumerThd(queue, new JCQueue.Consumer() { - private boolean head = true; + final AtomicReference<Object> result = new AtomicReference<>(); + Runnable consumer = new ConsumerThd(queue, new JCQueue.Consumer() { + private boolean head = true; - @Override - public void accept(Object event) { - if (head) { - head = false; - result.set(event); - } + @Override + public void accept(Object event) { + if (head) { + head = false; + result.set(event); } + } - @Override - public void flush() { - } - }); + @Override + public void flush() { + } + }); - run(producer, consumer, queue); - Assert.assertEquals("We expect to receive first published message first, but received " + result.get(), + run(producer, consumer, queue); + Assert.assertEquals("We expect to receive first published message first, but received " + result.get(), "FIRST", result.get()); - } }); }
