CHUKWA-797. Added retry logic for sending data to HBase. (Eric Yang)
Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/fc706655 Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/fc706655 Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/fc706655 Branch: refs/heads/master Commit: fc70665574b7b54de583cd224552f032d8706633 Parents: 3f5bb3c Author: Eric Yang <[email protected]> Authored: Sat Mar 19 08:58:18 2016 -0700 Committer: Eric Yang <[email protected]> Committed: Sat Mar 19 08:58:18 2016 -0700 ---------------------------------------------------------------------- .../connector/PipelineConnector.java | 18 +++++++++--------- .../datacollection/writer/hbase/HBaseWriter.java | 11 ++++++++++- 2 files changed, 19 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/chukwa/blob/fc706655/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java index 929d871..bcb167a 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java @@ -109,18 +109,20 @@ public class PipelineConnector implements Connector, Runnable { try { // get all ready chunks from the chunkQueue to be sent chunkQueue.collect(newQueue, MAX_SIZE_PER_POST); + CommitStatus result = writers.add(newQueue); + if(result.equals(ChukwaWriter.COMMIT_OK)) { + chunkCount = newQueue.size(); + for (Chunk c : newQueue) { + agent.reportCommit(c.getInitiator(), c.getSeqID()); + } + } + } catch (WriterException e) { + log.warn("PipelineStageWriter Exception: ", e); } catch (InterruptedException e) { log.warn("thread interrupted during addChunks(ChunkQueue)"); Thread.currentThread().interrupt(); break; } - CommitStatus result = writers.add(newQueue); - if(result.equals(ChukwaWriter.COMMIT_OK)) { - chunkCount = newQueue.size(); - for (Chunk c : newQueue) { - agent.reportCommit(c.getInitiator(), c.getSeqID()); - } - } long now = System.currentTimeMillis(); long delta = MIN_POST_INTERVAL - now + lastPost; if(delta > 0) { @@ -129,8 +131,6 @@ public class PipelineConnector implements Connector, Runnable { lastPost = now; } // end of try forever loop log.info("received stop() command so exiting run() loop to shutdown connector"); - } catch (WriterException e) { - log.warn("PipelineStageWriter Exception: ", e); } catch (OutOfMemoryError e) { log.warn("Bailing out", e); throw new RuntimeException("Shutdown pipeline connector."); http://git-wip-us.apache.org/repos/asf/chukwa/blob/fc706655/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java index 621f22e..5ba87bd 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java @@ -54,6 +54,7 @@ public class HBaseWriter extends PipelineableWriter { private ArrayList<Put> output; private Reporter reporter; private ChukwaConfiguration conf; + private Configuration hconf; String defaultProcessor; private static Connection connection; @@ -92,6 +93,7 @@ public class HBaseWriter extends PipelineableWriter { private HBaseWriter(boolean reportStats, ChukwaConfiguration conf, Configuration hconf) throws IOException { this.reportStats = reportStats; this.conf = conf; + this.hconf = hconf; this.statTimer = new Timer(); this.defaultProcessor = conf.get( "chukwa.demux.mapper.default.processor", @@ -106,7 +108,7 @@ public class HBaseWriter extends PipelineableWriter { } catch (NoSuchAlgorithmException e) { throw new IOException("Can not register hashing algorithm."); } - if (connection == null) { + if (connection == null || connection.isClosed()) { connection = ConnectionFactory.createConnection(hconf); } } @@ -118,6 +120,13 @@ public class HBaseWriter extends PipelineableWriter { } public void init(Configuration conf) throws WriterException { + if (connection == null || connection.isClosed()) { + try { + connection = ConnectionFactory.createConnection(hconf); + } catch (IOException e) { + throw new WriterException("HBase is offline, retry later..."); + } + } } @Override
