Repository: storm Updated Branches: refs/heads/master 4eaaa0bb1 -> e2ca82f8f
STORM-1030. Hive Connector Fixes. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e5f0e91f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e5f0e91f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e5f0e91f Branch: refs/heads/master Commit: e5f0e91fcf7d377817982efd80f6e03d73dc371b Parents: c2cf3be Author: Sriharsha Chintalapani <[email protected]> Authored: Mon Nov 9 16:40:34 2015 -0800 Committer: Sriharsha Chintalapani <[email protected]> Committed: Mon Mar 28 13:05:52 2016 -0700 ---------------------------------------------------------------------- .../org/apache/storm/hive/bolt/HiveBolt.java | 147 ++++++++------ .../apache/storm/hive/common/HiveOptions.java | 8 +- .../org/apache/storm/hive/common/HiveUtils.java | 11 +- .../apache/storm/hive/common/HiveWriter.java | 127 ++++++++----- .../apache/storm/hive/trident/HiveState.java | 38 ++-- .../storm/hive/trident/HiveStateFactory.java | 1 + .../apache/storm/hive/trident/HiveUpdater.java | 1 + .../storm/hive/bolt/BucketTestHiveTopology.java | 190 +++++++++++++++++++ .../apache/storm/hive/bolt/HiveTopology.java | 6 +- .../apache/storm/hive/bolt/TestHiveBolt.java | 11 +- .../storm/hive/common/TestHiveWriter.java | 13 +- .../storm/hive/trident/TridentHiveTopology.java | 2 +- 12 files changed, 415 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java index 0646dcb..ef06e4b 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java @@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory; import java.util.List; import java.util.ArrayList; import java.util.Map; -import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.Timer; import java.util.TimerTask; import java.util.Map.Entry; @@ -56,14 +56,14 @@ public class HiveBolt extends BaseRichBolt { private ExecutorService callTimeoutPool; private transient Timer heartBeatTimer; private Boolean kerberosEnabled = false; - private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false); + private AtomicBoolean sendHeartBeat = new AtomicBoolean(false); private UserGroupInformation ugi = null; - HashMap<HiveEndPoint, HiveWriter> allWriters; + private Map<HiveEndPoint, HiveWriter> allWriters; private List<Tuple> tupleBatch; public HiveBolt(HiveOptions options) { this.options = options; - tupleBatch = new LinkedList<>(); + tupleBatch = new LinkedList<Tuple>(); } @Override @@ -87,10 +87,12 @@ public class HiveBolt extends BaseRichBolt { } } this.collector = collector; - allWriters = new HashMap<HiveEndPoint,HiveWriter>(); + allWriters = new ConcurrentHashMap<HiveEndPoint,HiveWriter>(); String timeoutName = "hive-bolt-%d"; this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat(timeoutName).build()); + + sendHeartBeat.set(true); heartBeatTimer = new Timer(); setupHeartBeatTimer(); @@ -105,44 +107,37 @@ public class HiveBolt extends BaseRichBolt { boolean forceFlush = false; if (TupleUtils.isTick(tuple)) { LOG.debug("TICK received! current batch status [{}/{}]", tupleBatch.size(), options.getBatchSize()); - collector.ack(tuple); forceFlush = true; - } - else { + } else { List<String> partitionVals = options.getMapper().mapPartitions(tuple); HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options); HiveWriter writer = getOrCreateWriter(endPoint); - if (timeToSendHeartBeat.compareAndSet(true, false)) { - enableHeartBeatOnAllWriters(); - } writer.write(options.getMapper().mapRecord(tuple)); tupleBatch.add(tuple); if (tupleBatch.size() >= options.getBatchSize()) forceFlush = true; } + if(forceFlush && !tupleBatch.isEmpty()) { flushAllWriters(true); LOG.info("acknowledging tuples after writers flushed "); - for(Tuple t : tupleBatch) + for(Tuple t : tupleBatch) { collector.ack(t); + } tupleBatch.clear(); } + } catch(SerializationError se) { + LOG.info("Serialization exception occurred, tuple is acknowledged but not written to Hive.", tuple); + this.collector.reportError(se); + collector.ack(tuple); } catch(Exception e) { this.collector.reportError(e); collector.fail(tuple); - try { - flushAndCloseWriters(); - LOG.info("acknowledging tuples after writers flushed and closed"); - for (Tuple t : tupleBatch) - collector.ack(t); - tupleBatch.clear(); - } catch (Exception e1) { - //If flushAndClose fails assume tuples are lost, do not ack - LOG.warn("Error while flushing and closing writers, tuples will NOT be acknowledged"); - for (Tuple t : tupleBatch) - collector.fail(t); - tupleBatch.clear(); + for (Tuple t : tupleBatch) { + collector.fail(t); } + tupleBatch.clear(); + abortAndCloseWriters(); } } @@ -153,13 +148,11 @@ public class HiveBolt extends BaseRichBolt { @Override public void cleanup() { + sendHeartBeat.set(false); for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) { try { HiveWriter w = entry.getValue(); - LOG.info("Flushing writer to {}", w); - w.flush(false); - LOG.info("Closing writer to {}", w); - w.close(); + w.flushAndClose(); } catch (Exception ex) { LOG.warn("Error while closing writer to " + entry.getKey() + ". Exception follows.", ex); @@ -181,6 +174,7 @@ public class HiveBolt extends BaseRichBolt { LOG.warn("shutdown interrupted on " + execService, ex); } } + callTimeoutPool = null; super.cleanup(); LOG.info("Hive Bolt stopped"); @@ -188,8 +182,14 @@ public class HiveBolt extends BaseRichBolt { @Override public Map<String, Object> getComponentConfiguration() { - return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), - options.getTickTupleInterval()); + Map<String, Object> conf = super.getComponentConfiguration(); + if (conf == null) + conf = new Config(); + + if (options.getTickTupleInterval() > 0) + conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, options.getTickTupleInterval()); + + return conf; } private void setupHeartBeatTimer() { @@ -197,13 +197,26 @@ public class HiveBolt extends BaseRichBolt { heartBeatTimer.schedule(new TimerTask() { @Override public void run() { - timeToSendHeartBeat.set(true); - setupHeartBeatTimer(); + try { + if (sendHeartBeat.get()) { + LOG.debug("Start sending heartbeat on all writers"); + sendHeartBeatOnAllWriters(); + setupHeartBeatTimer(); + } + } catch (Exception e) { + LOG.warn("Failed to heartbeat on HiveWriter ", e); + } } }, options.getHeartBeatInterval() * 1000); } } + private void sendHeartBeatOnAllWriters() throws InterruptedException { + for (HiveWriter writer : allWriters.values()) { + writer.heartBeat(); + } + } + void flushAllWriters(boolean rollToNext) throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException { for(HiveWriter writer: allWriters.values()) { @@ -211,54 +224,60 @@ public class HiveBolt extends BaseRichBolt { } } - /** - * Closes all writers and remove them from cache - * @return number of writers retired - */ - private void closeAllWriters() { + void abortAndCloseWriters() { try { - //1) Retire writers - for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { - entry.getValue().close(); - } - //2) Clear cache - allWriters.clear(); - } catch(Exception e) { - LOG.warn("unable to close writers. ", e); + abortAllWriters(); + closeAllWriters(); + } catch(Exception ie) { + LOG.warn("unable to close hive connections. ", ie); } } - void flushAndCloseWriters() throws Exception { - try { - flushAllWriters(false); - } catch(Exception e) { - LOG.warn("unable to flush hive writers. ", e); - throw e; - } finally { - closeAllWriters(); + /** + * Abort current Txn on all writers + */ + private void abortAllWriters() throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure { + for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { + try { + entry.getValue().abort(); + } catch (Exception e) { + LOG.error("Failed to abort hive transaction batch, HiveEndPoint " + entry.getValue() +" due to exception ", e); + } } } - private void enableHeartBeatOnAllWriters() { - for (HiveWriter writer : allWriters.values()) { - writer.setHeartBeatNeeded(); + /** + * Closes all writers and remove them from cache + */ + private void closeAllWriters() { + //1) Retire writers + for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { + try { + entry.getValue().close(); + } catch(Exception e) { + LOG.warn("unable to close writers. ", e); + } } + //2) Clear cache + allWriters.clear(); } private HiveWriter getOrCreateWriter(HiveEndPoint endPoint) throws HiveWriter.ConnectFailure, InterruptedException { try { HiveWriter writer = allWriters.get( endPoint ); - if( writer == null ) { + if (writer == null) { LOG.debug("Creating Writer to Hive end point : " + endPoint); writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options); - if(allWriters.size() > options.getMaxOpenConnections()){ + if (allWriters.size() > (options.getMaxOpenConnections() - 1)) { + LOG.info("cached HiveEndPoint size {} exceeded maxOpenConnections {} ", allWriters.size(), options.getMaxOpenConnections()); int retired = retireIdleWriters(); if(retired==0) { retireEldestWriter(); } } allWriters.put(endPoint, writer); + HiveUtils.logAllHiveEndPoints(allWriters); } return writer; } catch (HiveWriter.ConnectFailure e) { @@ -271,22 +290,25 @@ public class HiveBolt extends BaseRichBolt { * Locate writer that has not been used for longest time and retire it */ private void retireEldestWriter() { + LOG.info("Attempting close eldest writers"); long oldestTimeStamp = System.currentTimeMillis(); HiveEndPoint eldest = null; for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { - if(entry.getValue().getLastUsed() < oldestTimeStamp) { + if (entry.getValue().getLastUsed() < oldestTimeStamp) { eldest = entry.getKey(); oldestTimeStamp = entry.getValue().getLastUsed(); } } try { LOG.info("Closing least used Writer to Hive end point : " + eldest); - allWriters.remove(eldest).close(); + allWriters.remove(eldest).flushAndClose(); } catch (IOException e) { LOG.warn("Failed to close writer for end point: " + eldest, e); } catch (InterruptedException e) { LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, e); Thread.currentThread().interrupt(); + } catch (Exception e) { + LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, e); } } @@ -295,6 +317,7 @@ public class HiveBolt extends BaseRichBolt { * @return number of writers retired */ private int retireIdleWriters() { + LOG.info("Attempting close idle writers"); int count = 0; long now = System.currentTimeMillis(); ArrayList<HiveEndPoint> retirees = new ArrayList<HiveEndPoint>(); @@ -310,12 +333,14 @@ public class HiveBolt extends BaseRichBolt { for(HiveEndPoint ep : retirees) { try { LOG.info("Closing idle Writer to Hive end point : {}", ep); - allWriters.remove(ep).close(); + allWriters.remove(ep).flushAndClose(); } catch (IOException e) { LOG.warn("Failed to close writer for end point: {}. Error: "+ ep, e); } catch (InterruptedException e) { LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e); Thread.currentThread().interrupt(); + } catch (Exception e) { + LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e); } } return count; http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java index 8c2f55d..ab81a75 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java @@ -36,11 +36,11 @@ public class HiveOptions implements Serializable { protected String tableName; protected String metaStoreURI; protected Integer txnsPerBatch = 100; - protected Integer maxOpenConnections = 500; + protected Integer maxOpenConnections = 10; protected Integer batchSize = 15000; - protected Integer idleTimeout = 0; - protected Integer callTimeout = 10000; - protected Integer heartBeatInterval = 240; + protected Integer idleTimeout = 60000; + protected Integer callTimeout = 0; + protected Integer heartBeatInterval = 60; protected Boolean autoCreatePartitions = true; protected String kerberosPrincipal; protected String kerberosKeytab; http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java index 5483b07..591d565 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java @@ -25,12 +25,17 @@ import org.apache.hive.hcatalog.streaming.*; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.io.File; import java.io.IOException; public class HiveUtils { + private static final Logger LOG = LoggerFactory.getLogger(HiveUtils.class); public static HiveEndPoint makeEndPoint(List<String> partitionVals, HiveOptions options) throws ConnectionError { if(partitionVals==null) { @@ -72,5 +77,9 @@ public class HiveUtils { } } - + public static void logAllHiveEndPoints(Map<HiveEndPoint, HiveWriter> allWriters) { + for (Map.Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { + LOG.info("cached writers {} ", entry.getValue()); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java index 233fec0..4df1c60 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java @@ -46,15 +46,15 @@ public class HiveWriter { private final StreamingConnection connection; private final int txnsPerBatch; private final RecordWriter recordWriter; - private TransactionBatch txnBatch; private final ExecutorService callTimeoutPool; private final long callTimeout; - + private final Object txnBatchLock = new Object(); + private TransactionBatch txnBatch; private long lastUsed; // time of last flush on this writer protected boolean closed; // flag indicating HiveWriter was closed private boolean autoCreatePartitions; - private boolean heartBeatNeeded = false; private UserGroupInformation ugi; + private int totalRecords = 0; public HiveWriter(HiveEndPoint endPoint, int txnsPerBatch, boolean autoCreatePartitions, long callTimeout, @@ -83,11 +83,9 @@ public class HiveWriter { @Override public String toString() { - return endPoint.toString(); - } - - public void setHeartBeatNeeded() { - heartBeatNeeded = true; + return "{ " + + "endPoint = " + endPoint.toString() + + ", TransactionBatch = " + txnBatch.toString() + " }"; } /** @@ -97,7 +95,7 @@ public class HiveWriter { * @throws InterruptedException */ public synchronized void write(final byte[] record) - throws WriteFailure, InterruptedException { + throws WriteFailure, SerializationError, InterruptedException { if (closed) { throw new IllegalStateException("This hive streaming writer was closed " + "and thus no longer able to write : " + endPoint); @@ -109,9 +107,12 @@ public class HiveWriter { @Override public Void call() throws StreamingException, InterruptedException { txnBatch.write(record); + totalRecords++; return null; } }); + } catch(SerializationError se) { + throw new SerializationError(endPoint.toString() + " SerializationError", se); } catch(StreamingException e) { throw new WriteFailure(endPoint, txnBatch.getCurrentTxnId(), e); } catch(TimeoutException e) { @@ -120,29 +121,20 @@ public class HiveWriter { } /** - * Commits the current Txn. + * Commits the current Txn if totalRecordsPerTransaction > 0 . * If 'rollToNext' is true, will switch to next Txn in batch or to a * new TxnBatch if current Txn batch is exhausted - * TODO: see what to do when there are errors in each IO call stage */ public void flush(boolean rollToNext) throws CommitFailure, TxnBatchFailure, TxnFailure, InterruptedException { - if(heartBeatNeeded) { - heartBeatNeeded = false; - heartBeat(); - } - lastUsed = System.currentTimeMillis(); + // if there are no records do not call flush + if (totalRecords <= 0) return; try { - commitTxn(); - if(txnBatch.remainingTransactions() == 0) { - closeTxnBatch(); - txnBatch = null; - if(rollToNext) { - txnBatch = nextTxnBatch(recordWriter); - } - } else if(rollToNext) { - LOG.debug("Switching to next Txn for {}", endPoint); - txnBatch.beginNextTransaction(); // does not block + synchronized(txnBatchLock) { + commitTxn(); + nextTxn(rollToNext); + totalRecords = 0; + lastUsed = System.currentTimeMillis(); } } catch(StreamingException e) { throw new TxnFailure(txnBatch, e); @@ -154,28 +146,46 @@ public class HiveWriter { */ public void heartBeat() throws InterruptedException { // 1) schedule the heartbeat on one thread in pool - try { - callWithTimeout(new CallRunner<Void>() { - @Override + synchronized(txnBatchLock) { + try { + callWithTimeout(new CallRunner<Void>() { + @Override public Void call() throws Exception { - try { - LOG.debug("Sending heartbeat on batch " + txnBatch); - txnBatch.heartbeat(); - } catch (StreamingException e) { - LOG.warn("Heartbeat error on batch " + txnBatch, e); + try { + LOG.info("Sending heartbeat on batch " + txnBatch); + txnBatch.heartbeat(); + } catch (StreamingException e) { + LOG.warn("Heartbeat error on batch " + txnBatch, e); + } + return null; } - return null; - } - }); - } catch (InterruptedException e) { - throw e; - } catch (Exception e) { - LOG.warn("Unable to send heartbeat on Txn Batch " + txnBatch, e); - // Suppressing exceptions as we don't care for errors on heartbeats + }); + } catch (InterruptedException e) { + throw e; + } catch (Exception e) { + LOG.warn("Unable to send heartbeat on Txn Batch " + txnBatch, e); + // Suppressing exceptions as we don't care for errors on heartbeats + } } } /** + * returns totalRecords written so far in a transaction + * @returns totalRecords + */ + public int getTotalRecords() { + return totalRecords; + } + + /** + * Flush and Close current transactionBatch. + */ + public void flushAndClose() throws TxnBatchFailure, TxnFailure, CommitFailure, + IOException, InterruptedException { + flush(false); + close(); + } + /** * Close the Transaction Batch and connection * @throws IOException * @throws InterruptedException @@ -246,8 +256,8 @@ public class HiveWriter { return connection.fetchTransactionBatch(txnsPerBatch, recordWriter); // could block } }); - batch.beginNextTransaction(); - LOG.debug("Acquired {}. Switching to first txn", batch); + batch.beginNextTransaction(); + LOG.debug("Acquired {}. Switching to first txn", batch); } catch(TimeoutException e) { throw new TxnBatchFailure(endPoint, e); } catch(StreamingException e) { @@ -279,10 +289,17 @@ public class HiveWriter { * Aborts the current Txn and switches to next Txn. * @throws StreamingException if could not get new Transaction Batch, or switch to next Txn */ - public void abort() throws InterruptedException { - abortTxn(); + public void abort() throws StreamingException, TxnBatchFailure, InterruptedException { + synchronized(txnBatchLock) { + abortTxn(); + nextTxn(true); // roll to next + } } + + /** + * Aborts current Txn in the txnBatch. + */ private void abortTxn() throws InterruptedException { LOG.info("Aborting Txn id {} on End Point {}", txnBatch.getCurrentTxnId(), endPoint); try { @@ -305,6 +322,24 @@ public class HiveWriter { /** + * if there are remainingTransactions in current txnBatch, begins nextTransactions + * otherwise creates new txnBatch. + * @param boolean rollToNext + */ + private void nextTxn(boolean rollToNext) throws StreamingException, InterruptedException, TxnBatchFailure { + if(txnBatch.remainingTransactions() == 0) { + closeTxnBatch(); + txnBatch = null; + if(rollToNext) { + txnBatch = nextTxnBatch(recordWriter); + } + } else if(rollToNext) { + LOG.debug("Switching to next Txn for {}", endPoint); + txnBatch.beginNextTransaction(); // does not block + } + } + + /** * If the current thread has been interrupted, then throws an * exception. * @throws InterruptedException http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java index dd296e4..08a5953 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java @@ -15,6 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + + package org.apache.storm.hive.trident; import org.apache.storm.trident.operation.TridentCollector; @@ -38,7 +40,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.Timer; import java.util.TimerTask; import java.util.Map.Entry; @@ -55,9 +57,10 @@ public class HiveState implements State { private ExecutorService callTimeoutPool; private transient Timer heartBeatTimer; private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false); + private Boolean sendHeartBeat = true; private UserGroupInformation ugi = null; private Boolean kerberosEnabled = false; - HashMap<HiveEndPoint, HiveWriter> allWriters; + private Map<HiveEndPoint, HiveWriter> allWriters; public HiveState(HiveOptions options) { this.options = options; @@ -93,7 +96,7 @@ public class HiveState implements State { } } - allWriters = new HashMap<HiveEndPoint,HiveWriter>(); + allWriters = new ConcurrentHashMap<HiveEndPoint,HiveWriter>(); String timeoutName = "hive-bolt-%d"; this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat(timeoutName).build()); @@ -116,9 +119,6 @@ public class HiveState implements State { private void writeTuples(List<TridentTuple> tuples) throws Exception { - if(timeToSendHeartBeat.compareAndSet(true, false)) { - enableHeartBeatOnAllWriters(); - } for (TridentTuple tuple : tuples) { List<String> partitionVals = options.getMapper().mapPartitions(tuple); HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options); @@ -134,20 +134,18 @@ public class HiveState implements State { private void abortAndCloseWriters() { try { + sendHeartBeat = false; abortAllWriters(); closeAllWriters(); - } catch(InterruptedException e) { - LOG.warn("unable to close hive connections. ", e); - } catch(IOException ie) { + } catch(Exception ie) { LOG.warn("unable to close hive connections. ", ie); } } /** * Abort current Txn on all writers - * @return number of writers retired */ - private void abortAllWriters() throws InterruptedException { + private void abortAllWriters() throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure { for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) { entry.getValue().abort(); } @@ -172,8 +170,15 @@ public class HiveState implements State { heartBeatTimer.schedule(new TimerTask() { @Override public void run() { - timeToSendHeartBeat.set(true); - setupHeartBeatTimer(); + try { + if (sendHeartBeat) { + LOG.debug("Start sending heartbeat on all writers"); + sendHeartBeatOnAllWriters(); + setupHeartBeatTimer(); + } + } catch (Exception e) { + LOG.warn("Failed to heartbeat on HiveWriter ", e); + } } }, options.getHeartBeatInterval() * 1000); } @@ -186,9 +191,9 @@ public class HiveState implements State { } } - private void enableHeartBeatOnAllWriters() { + private void sendHeartBeatOnAllWriters() throws InterruptedException { for (HiveWriter writer : allWriters.values()) { - writer.setHeartBeatNeeded(); + writer.heartBeat(); } } @@ -199,7 +204,7 @@ public class HiveState implements State { if( writer == null ) { LOG.info("Creating Writer to Hive end point : " + endPoint); writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options); - if(allWriters.size() > options.getMaxOpenConnections()){ + if(allWriters.size() > (options.getMaxOpenConnections() - 1)){ int retired = retireIdleWriters(); if(retired==0) { retireEldestWriter(); @@ -274,6 +279,7 @@ public class HiveState implements State { public void cleanup() { for (Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) { try { + sendHeartBeat = false; HiveWriter w = entry.getValue(); LOG.info("Flushing writer to {}", w); w.flush(false); http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java index 7e0e1f2..d6e3c71 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.hive.trident; import org.apache.storm.task.IMetricsContext; http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java index 82cfc15..062f7fb 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.hive.trident; import org.apache.storm.trident.operation.TridentCollector; http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java new file mode 100644 index 0000000..607bd61 --- /dev/null +++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/BucketTestHiveTopology.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hive.bolt; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.MockTupleHelpers; + +import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper; +import org.apache.storm.hive.common.HiveOptions; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + + +public class BucketTestHiveTopology { + static final String USER_SPOUT_ID = "user-spout"; + static final String BOLT_ID = "my-hive-bolt"; + static final String TOPOLOGY_NAME = "hive-test-topology1"; + + public static void main(String[] args) throws Exception { + if ((args == null) || (args.length < 7)) { + System.out.println("Usage: BucketTestHiveTopology metastoreURI " + + "dbName tableName dataFileLocation hiveBatchSize " + + "hiveTickTupl]eIntervalSecs workers [topologyNamey] [keytab file]" + + " [principal name] "); + System.exit(1); + } + String metaStoreURI = args[0]; + String dbName = args[1]; + String tblName = args[2]; + String sourceFileLocation = args[3]; + Integer hiveBatchSize = Integer.parseInt(args[4]); + Integer hiveTickTupleIntervalSecs = Integer.parseInt(args[5]); + Integer workers = Integer.parseInt(args[6]); + String[] colNames = { "ss_sold_date_sk", "ss_sold_time_sk", "ss_item_sk", + "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", + "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity", + "ss_wholesale_cost", "ss_list_price", "ss_sales_price", + "ss_ext_discount_amt", "ss_ext_sales_price", + "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax", + "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", + "ss_net_profit" }; + Config config = new Config(); + config.setNumWorkers(workers); + UserDataSpout spout = new UserDataSpout().withDataFile(sourceFileLocation); + DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() + .withColumnFields(new Fields(colNames)).withTimeAsPartitionField("yyyy/MM/dd"); + HiveOptions hiveOptions; + hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper) + .withTxnsPerBatch(10) + .withBatchSize(hiveBatchSize); + // doing below because its affecting storm metrics most likely + // had to make tick tuple a mandatory argument since its positional + if (hiveTickTupleIntervalSecs > 0) { + hiveOptions.withTickTupleInterval(hiveTickTupleIntervalSecs); + } + if (args.length == 10) { + hiveOptions.withKerberosKeytab(args[8]).withKerberosPrincipal(args[9]); + } + HiveBolt hiveBolt = new HiveBolt(hiveOptions); + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout(USER_SPOUT_ID, spout, 1); + // SentenceSpout --> MyBolt + builder.setBolt(BOLT_ID, hiveBolt, 14) + .shuffleGrouping(USER_SPOUT_ID); + if (args.length == 6) { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); + waitForSeconds(20); + cluster.killTopology(TOPOLOGY_NAME); + System.out.println("cluster begin to shutdown"); + cluster.shutdown(); + System.out.println("cluster shutdown"); + System.exit(0); + } else { + StormSubmitter.submitTopology(args[7], config, builder.createTopology()); + } + } + + public static void waitForSeconds(int seconds) { + try { + Thread.sleep(seconds * 1000); + } catch (InterruptedException e) { + } + } + + public static class UserDataSpout extends BaseRichSpout { + private ConcurrentHashMap<UUID, Values> pending; + private SpoutOutputCollector collector; + private String filePath; + private BufferedReader br; + private int count = 0; + private long total = 0L; + private String[] outputFields = { "ss_sold_date_sk", "ss_sold_time_sk", + "ss_item_sk", "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", + "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", + "ss_quantity", "ss_wholesale_cost", "ss_list_price", + "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", + "ss_ext_wholesale_cost", "ss_ext_list_price", "ss_ext_tax", + "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", + "ss_net_profit" }; + + public UserDataSpout withDataFile (String filePath) { + this.filePath = filePath; + return this; + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(this.outputFields)); + } + + public void open(Map config, TopologyContext context, + SpoutOutputCollector collector) { + this.collector = collector; + this.pending = new ConcurrentHashMap<UUID, Values>(); + try { + this.br = new BufferedReader(new FileReader(new File(this + .filePath))); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + + public void nextTuple() { + String line; + try { + if ((line = br.readLine()) != null) { + System.out.println("*********" + line); + String[] values = line.split("\\|", -1); + // above gives an extra empty string at the end. below + // removes that + values = Arrays.copyOfRange(values, 0, + this.outputFields.length); + Values tupleValues = new Values(values); + UUID msgId = UUID.randomUUID(); + this.pending.put(msgId, tupleValues); + this.collector.emit(tupleValues, msgId); + count++; + total++; + if (count > 1000) { + count = 0; + System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total); + } + } + } catch (IOException ex) { + ex.printStackTrace(); + } + } + + public void ack(Object msgId) { + this.pending.remove(msgId); + } + + public void fail(Object msgId) { + System.out.println("**** RESENDING FAILED TUPLE"); + this.collector.emit(this.pending.get(msgId), msgId); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java index 1132587..8b61d5e 100644 --- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java +++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveTopology.java @@ -51,7 +51,8 @@ public class HiveTopology { config.setNumWorkers(1); UserDataSpout spout = new UserDataSpout(); DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() - .withColumnFields(new Fields(colNames)); + .withTimeAsPartitionField("yyyy/MM/dd/hh") + .withColumnFields(new Fields(colNames)); HiveOptions hiveOptions; if (args.length == 6) { hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper) @@ -64,7 +65,8 @@ public class HiveTopology { hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper) .withTxnsPerBatch(10) .withBatchSize(100) - .withIdleTimeout(10); + .withIdleTimeout(10) + .withMaxOpenConnections(1); } HiveBolt hiveBolt = new HiveBolt(hiveOptions); http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java index ead2c8d..0cf0084 100644 --- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java +++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java @@ -204,7 +204,7 @@ public class TestHiveBolt { @Test public void testWithTimeformat() throws Exception { - String[] partNames1 = {"date"}; + String[] partNames1 = {"dt"}; String timeFormat = "yyyy/MM/dd"; HiveSetupUtil.dropDB(conf,dbName1); HiveSetupUtil.createDbAndTable(conf, dbName1, tblName1, null, @@ -213,8 +213,9 @@ public class TestHiveBolt { .withColumnFields(new Fields(colNames)) .withTimeAsPartitionField(timeFormat); HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName1,tblName1,mapper) - .withTxnsPerBatch(2) - .withBatchSize(1); + .withTxnsPerBatch(2) + .withBatchSize(1) + .withMaxOpenConnections(1); bolt = new HiveBolt(hiveOptions); bolt.prepare(config,null,collector); Integer id = 100; @@ -319,7 +320,7 @@ public class TestHiveBolt { //This forces a failure of all the flush attempts doThrow(new InterruptedException()).when(spyBolt).flushAllWriters(true); - doThrow(new Exception()).when(spyBolt).flushAndCloseWriters(); + spyBolt.prepare(config, null, new OutputCollector(collector)); @@ -383,7 +384,7 @@ public class TestHiveBolt { //The tick should NOT cause any acks since the batch was empty except for acking itself Tuple mockTick = MockTupleHelpers.mockTickTuple(); bolt.execute(mockTick); - verify(collector).ack(mockTick); + verifyZeroInteractions(collector); bolt.cleanup(); } http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java index 952b0fb..e43fec6 100644 --- a/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java +++ b/external/storm-hive/src/test/java/org/apache/storm/hive/common/TestHiveWriter.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.hcatalog.streaming.HiveEndPoint; import org.apache.hive.hcatalog.streaming.StreamingException; +import org.apache.hive.hcatalog.streaming.SerializationError; import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper; import org.apache.storm.hive.bolt.mapper.HiveMapper; import org.apache.storm.hive.bolt.HiveSetupUtil; @@ -140,18 +141,22 @@ public class TestHiveWriter { , callTimeoutPool, mapper, ugi); Tuple tuple = generateTestTuple("1","abc"); writer.write(mapper.mapRecord(tuple)); + tuple = generateTestTuple("2","def"); + writer.write(mapper.mapRecord(tuple)); + Assert.assertEquals(writer.getTotalRecords(), 2); checkRecordCountInTable(dbName,tblName,0); writer.flush(true); + Assert.assertEquals(writer.getTotalRecords(), 0); - tuple = generateTestTuple("2","def"); + tuple = generateTestTuple("3","ghi"); writer.write(mapper.mapRecord(tuple)); writer.flush(true); - tuple = generateTestTuple("3","ghi"); + tuple = generateTestTuple("4","klm"); writer.write(mapper.mapRecord(tuple)); writer.flush(true); writer.close(); - checkRecordCountInTable(dbName,tblName,3); + checkRecordCountInTable(dbName,tblName,4); } private Tuple generateTestTuple(Object id, Object msg) { @@ -167,7 +172,7 @@ public class TestHiveWriter { } private void writeTuples(HiveWriter writer, HiveMapper mapper, int count) - throws HiveWriter.WriteFailure, InterruptedException { + throws HiveWriter.WriteFailure, InterruptedException, SerializationError { Integer id = 100; String msg = "test-123"; for (int i = 1; i <= count; i++) { http://git-wip-us.apache.org/repos/asf/storm/blob/e5f0e91f/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java b/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java index 10921bc..d6c1d65 100644 --- a/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java +++ b/external/storm-hive/src/test/java/org/apache/storm/hive/trident/TridentHiveTopology.java @@ -179,7 +179,7 @@ public class TridentHiveTopology { } @Override - public Map<String, Object> getComponentConfiguration() { + public Map getComponentConfiguration() { Config conf = new Config(); conf.setMaxTaskParallelism(1); return conf;
