Repository: ambari Updated Branches: refs/heads/trunk 18d24e55e -> 0821a3111
AMBARI-17136. If Solr is down or not ready, then LogFeeder to should retry (Bosco Durai via oleewere) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0821a311 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0821a311 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0821a311 Branch: refs/heads/trunk Commit: 0821a3111e1e6e524328bb8a9e1c452f6127f00f Parents: 18d24e5 Author: oleewere <oleew...@gmail.com> Authored: Mon Jun 13 13:23:25 2016 +0200 Committer: oleewere <oleew...@gmail.com> Committed: Mon Jun 13 16:12:02 2016 +0200 ---------------------------------------------------------------------- .../ambari/logfeeder/output/OutputSolr.java | 102 ++++++++++--------- 1 file changed, 55 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/0821a311/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java index 6fb0b0e..b14c273 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java @@ -44,6 +44,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.impl.LBHttpSolrClient; import org.apache.solr.client.solrj.response.SolrPingResponse; import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; public class OutputSolr extends Output { @@ -299,33 +300,21 @@ public class OutputSolr extends Output { if (localBuffer.size() > 0 && ((outputData == null && isDrain()) || (nextDispatchDuration <= 0 || localBuffer.size() >= maxBufferSize))) { - try { - if (isComputeCurrentCollection) { - // Compute the current router value - addRouterField(); - } - - addToSolr(outputData); - - resetLocalBuffer(); - lastDispatchTime = System.currentTimeMillis(); - } catch (IOException ioException) { - // Transient error, lets block till it is available - waitForSolr(); - } catch (Throwable serverException) { - // Clear the buffer - resetLocalBuffer(); - String logMessageKey = this.getClass().getSimpleName() + "_SOLR_UPDATE_EXCEPTION"; - LogFeederUtil.logErrorMessageByInterval(logMessageKey, - "Error sending log message to server. " + outputData, serverException, LOG, Level.ERROR); + boolean response = sendToSolr(outputData); + if( isDrain() && !response) { + //Since sending to Solr response failed and it is in draining mode, let's break; + LOG.warn("In drain mode and sending to Solr failed. So exiting. output=" + + getShortDescription()); + break; } + lastDispatchTime = currTimeMS; } } catch (InterruptedException e) { // Handle thread exiting } catch (Throwable t) { String logMessageKey = this.getClass().getSimpleName() + "_SOLR_MAINLOOP_EXCEPTION"; LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception in main loop. " + outputData, t, LOG, - Level.ERROR); + Level.ERROR); } } @@ -335,6 +324,50 @@ public class OutputSolr extends Output { LOG.info("Exiting Solr worker thread. output=" + getShortDescription()); } + /** + * This will loop till Solr is available and LogFeeder is + * successfully able to write to the collection or shard. It will block till + * it can write. The outgoingBuffer is a BlockingQueue and when it is full, it + * will automatically stop parsing the log files. + * @param outputData + * @return + */ + private boolean sendToSolr(OutputData outputData) { + boolean result = false; + while (!isDrain()) { + try { + if (isComputeCurrentCollection) { + // Compute the current router value + addRouterField(); + } + addToSolr(outputData); + resetLocalBuffer(); + //Send successful, will return + result = true; + break; + } catch (IOException | SolrException exception) { + // Transient error, lets block till it is available + try { + LOG.warn("Solr is not reachable. Going to retry after " + + RETRY_INTERVAL + " seconds. " + "output=" + + getShortDescription(), exception); + Thread.sleep(RETRY_INTERVAL * 1000); + } catch (Throwable t) { + // ignore + } + } catch (Throwable serverException) { + // Something unknown happened. Let's not block because of this error. + // Clear the buffer + String logMessageKey = this.getClass().getSimpleName() + "_SOLR_UPDATE_EXCEPTION"; + LogFeederUtil.logErrorMessageByInterval(logMessageKey, + "Error sending log message to server. Dropping logs", serverException, LOG, Level.ERROR); + resetLocalBuffer(); + break; + } + } + return result; + } + private OutputData getOutputData(long nextDispatchDuration) throws InterruptedException { OutputData outputData = outgoingBuffer.poll(); if (outputData == null && !isDrain() && nextDispatchDuration > 0) { @@ -380,7 +413,7 @@ public class OutputSolr extends Output { } for (SolrInputDocument solrInputDocument : localBuffer) { - solrInputDocument.addField(ROUTER_FIELD, shard); + solrInputDocument.setField(ROUTER_FIELD, shard); } } @@ -398,31 +431,6 @@ public class OutputSolr extends Output { } } - private void waitForSolr() { - while (!isDrain()) { - try { - LOG.warn( - "Solr is down. Going to sleep for " + RETRY_INTERVAL + " seconds. " + "output=" + getShortDescription()); - Thread.sleep(RETRY_INTERVAL * 1000); - } catch (Throwable t) { - // ignore - break; - } - if (isDrain()) { - break; - } - try { - SolrPingResponse pingResponse = solrClient.ping(); - if (pingResponse.getStatus() == 0) { - LOG.info("Solr seems to be up now. Resuming... output=" + getShortDescription()); - break; - } - } catch (Throwable t) { - // Ignore - } - } - } - private void closeSolrClient() { if (solrClient != null) { try { @@ -443,4 +451,4 @@ public class OutputSolr extends Output { return localBuffer.isEmpty(); } } -} \ No newline at end of file +}