Repository: ambari
Updated Branches:
  refs/heads/branch-2.4 8edd8fd3a -> 151c0fd57


AMBARI-17136. If Solr is down or not ready, then LogFeeder to should retry 
(Bosco Durai via oleewere)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/151c0fd5
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/151c0fd5
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/151c0fd5

Branch: refs/heads/branch-2.4
Commit: 151c0fd578ec3217293aceafa1f33b55b536a7ad
Parents: 8edd8fd
Author: oleewere <oleew...@gmail.com>
Authored: Mon Jun 13 13:23:25 2016 +0200
Committer: oleewere <oleew...@gmail.com>
Committed: Mon Jun 13 16:24:06 2016 +0200

----------------------------------------------------------------------
 .../ambari/logfeeder/output/OutputSolr.java     | 102 ++++++++++---------
 1 file changed, 55 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/151c0fd5/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index 6fb0b0e..b14c273 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -44,6 +44,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
 import org.apache.solr.client.solrj.response.SolrPingResponse;
 import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
 
 public class OutputSolr extends Output {
@@ -299,33 +300,21 @@ public class OutputSolr extends Output {
 
           if (localBuffer.size() > 0 && ((outputData == null && isDrain())
               || (nextDispatchDuration <= 0 || localBuffer.size() >= 
maxBufferSize))) {
-            try {
-              if (isComputeCurrentCollection) {
-                // Compute the current router value
-                addRouterField();
-              }
-
-              addToSolr(outputData);
-
-              resetLocalBuffer();
-              lastDispatchTime = System.currentTimeMillis();
-            } catch (IOException ioException) {
-              // Transient error, lets block till it is available
-              waitForSolr();
-            } catch (Throwable serverException) {
-              // Clear the buffer
-              resetLocalBuffer();
-              String logMessageKey = this.getClass().getSimpleName() + 
"_SOLR_UPDATE_EXCEPTION";
-              LogFeederUtil.logErrorMessageByInterval(logMessageKey,
-                  "Error sending log message to server. " + outputData, 
serverException, LOG, Level.ERROR);
+            boolean response = sendToSolr(outputData);
+            if( isDrain() && !response) {
+              //Since sending to Solr response failed and it is in draining 
mode, let's break;
+              LOG.warn("In drain mode and sending to Solr failed. So exiting. 
output=" 
+                  + getShortDescription());
+              break;
             }
+            lastDispatchTime = currTimeMS;            
           }
         } catch (InterruptedException e) {
           // Handle thread exiting
         } catch (Throwable t) {
           String logMessageKey = this.getClass().getSimpleName() + 
"_SOLR_MAINLOOP_EXCEPTION";
           LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught 
exception in main loop. " + outputData, t, LOG,
-              Level.ERROR);
+                Level.ERROR);
         }
       }
 
@@ -335,6 +324,50 @@ public class OutputSolr extends Output {
       LOG.info("Exiting Solr worker thread. output=" + getShortDescription());
     }
 
+    /**
+     * This will loop till Solr is available and LogFeeder is
+     * successfully able to write to the collection or shard. It will block 
till
+     * it can write. The outgoingBuffer is a BlockingQueue and when it is 
full, it
+     * will automatically stop parsing the log files.
+     * @param outputData 
+     * @return 
+     */
+    private boolean sendToSolr(OutputData outputData) {
+      boolean result = false;
+      while (!isDrain()) {
+        try {
+          if (isComputeCurrentCollection) {
+            // Compute the current router value
+            addRouterField();
+          }
+          addToSolr(outputData);
+          resetLocalBuffer();
+          //Send successful, will return 
+          result = true;
+          break;
+        } catch (IOException | SolrException exception) {
+          // Transient error, lets block till it is available
+          try {
+            LOG.warn("Solr is not reachable. Going to retry after "
+                + RETRY_INTERVAL + " seconds. " + "output="
+                + getShortDescription(), exception);
+            Thread.sleep(RETRY_INTERVAL * 1000);
+          } catch (Throwable t) {
+            // ignore
+          }
+        } catch (Throwable serverException) {
+          // Something unknown happened. Let's not block because of this 
error. 
+          // Clear the buffer
+          String logMessageKey = this.getClass().getSimpleName() + 
"_SOLR_UPDATE_EXCEPTION";
+          LogFeederUtil.logErrorMessageByInterval(logMessageKey,
+              "Error sending log message to server. Dropping logs", 
serverException, LOG, Level.ERROR);
+          resetLocalBuffer();
+          break;
+        }
+      } 
+      return result;
+    }
+
     private OutputData getOutputData(long nextDispatchDuration) throws 
InterruptedException {
       OutputData outputData = outgoingBuffer.poll();
       if (outputData == null && !isDrain() && nextDispatchDuration > 0) {
@@ -380,7 +413,7 @@ public class OutputSolr extends Output {
       }
 
       for (SolrInputDocument solrInputDocument : localBuffer) {
-        solrInputDocument.addField(ROUTER_FIELD, shard);
+        solrInputDocument.setField(ROUTER_FIELD, shard);
       }
     }
 
@@ -398,31 +431,6 @@ public class OutputSolr extends Output {
       }
     }
 
-    private void waitForSolr() {
-      while (!isDrain()) {
-        try {
-          LOG.warn(
-              "Solr is down. Going to sleep for " + RETRY_INTERVAL + " 
seconds. " + "output=" + getShortDescription());
-          Thread.sleep(RETRY_INTERVAL * 1000);
-        } catch (Throwable t) {
-          // ignore
-          break;
-        }
-        if (isDrain()) {
-          break;
-        }
-        try {
-          SolrPingResponse pingResponse = solrClient.ping();
-          if (pingResponse.getStatus() == 0) {
-            LOG.info("Solr seems to be up now. Resuming... output=" + 
getShortDescription());
-            break;
-          }
-        } catch (Throwable t) {
-          // Ignore
-        }
-      }
-    }
-
     private void closeSolrClient() {
       if (solrClient != null) {
         try {
@@ -443,4 +451,4 @@ public class OutputSolr extends Output {
       return localBuffer.isEmpty();
     }
   }
-}
\ No newline at end of file
+}

Reply via email to