[ 
https://issues.apache.org/jira/browse/FLUME-2070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13683655#comment-13683655
 ] 

Hari Shreedharan commented on FLUME-2070:
-----------------------------------------

Hi Wolfgang,

Great patch, thanks! Here are some relatively minor comments I have:

 * This need not be synchronized, since only one thread - the sink runner will 
call process on a particular sink:
{code:title=/org/apache/flume/sink/solr/morphline/MorphlineSolrSink.java:119}
  public synchronized Status process() throws EventDeliveryException {
{code}


* This counter should be incremented every time indexer.process() is called 
(though not all sinks even try to update this counter}
{code:title=/org/apache/flume/sink/solr/morphline/MorphlineSolrSink.java:155}
      sinkCounter.addToEventDrainAttemptCount(numEventsTaken);
{code}

* We should log the exceptions if the rollbacks (for the indexer or the channel 
failed}:
{code:title=/org/apache/flume/sink/solr/morphline/MorphlineSolrSink.java:166}
      try {
        if (!isSolrTransactionCommitted) {
          indexer.rollbackTransaction();
        }
      } catch (Throwable t2) {
        ; // ignore
      } finally {
        try {
          txn.rollback();
        } catch (Throwable t4) {
          ; // ignore
        }
      }
{code}

* Why not use a LinkedBlockingDeque or something like that here:
{code:title=org.apache.flume.sink.solr.morphline.MorphlineInterceptor.LocalMorphlineInterceptor}
private final List<LocalMorphlineInterceptor> pool = new ArrayList();
{code}

Also we might want to make sure we only spawn a maximum number of the 
LocalMorphlineInterceptors?  

* This array can be pre-allocated and reused no?
{code:title=/org/apache/flume/sink/solr/morphline/BlobDeserializer.java:77}
byte[] buf = new byte[Math.min(maxBlobLength, DEFAULT_BUFFER_SIZE)];
{code} 

* This code seems to be repeated in BlobHandler and BlobDeserializer, maybe 
move it to a util class or something?:
{code}
      ByteArrayOutputStream blob = null;
      byte[] buf = new byte[Math.min(maxBlobLength, DEFAULT_BUFFER_SIZE)];
      int blobLength = 0;
      int n = 0;
      while ((n = in.read(buf, 0, Math.min(buf.length, maxBlobLength - 
blobLength))) != -1) {
        if (blob == null) {
          blob = new ByteArrayOutputStream(n);
        }
        blob.write(buf, 0, n);
        blobLength += n;
        if (blobLength >= maxBlobLength) {
          LOGGER.warn("Request length exceeds maxBlobLength ({}), truncating 
BLOB event!", maxBlobLength);
          break;
        }
      }
{code}

Also, we should see if we can fix the build issue. Also, could you please 
rebase on trunk?
                
> Add a Flume Morphline Solr Sink
> -------------------------------
>
>                 Key: FLUME-2070
>                 URL: https://issues.apache.org/jira/browse/FLUME-2070
>             Project: Flume
>          Issue Type: Improvement
>          Components: Sinks+Sources
>    Affects Versions: v1.3.1
>            Reporter: wolfgang hoschek
>             Fix For: v1.4.0
>
>         Attachments: FLUME-2070-v1.patch, FLUME-2070-v2.patch
>
>
> Add a Flume Morphline Solr Sink that extracts search documents from Flume 
> events, transforms them with a morphline and loads them in Near Real Time 
> into Apache Solr, typically a SolrCloud. 
> The sink is intended to be used alongside the HdfsSink. It is designed to 
> extract, transform and load any data in flexible ways, not just structured 
> data, but also arbitrary raw data, including data from many heterogeneous 
> data sources.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to