[
https://issues.apache.org/jira/browse/FLUME-2070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13683655#comment-13683655
]
Hari Shreedharan commented on FLUME-2070:
-----------------------------------------
Hi Wolfgang,
Great patch, thanks! Here are some relatively minor comments I have:
* This need not be synchronized, since only one thread - the sink runner will
call process on a particular sink:
{code:title=/org/apache/flume/sink/solr/morphline/MorphlineSolrSink.java:119}
public synchronized Status process() throws EventDeliveryException {
{code}
* This counter should be incremented every time indexer.process() is called
(though not all sinks even try to update this counter}
{code:title=/org/apache/flume/sink/solr/morphline/MorphlineSolrSink.java:155}
sinkCounter.addToEventDrainAttemptCount(numEventsTaken);
{code}
* We should log the exceptions if the rollbacks (for the indexer or the channel
failed}:
{code:title=/org/apache/flume/sink/solr/morphline/MorphlineSolrSink.java:166}
try {
if (!isSolrTransactionCommitted) {
indexer.rollbackTransaction();
}
} catch (Throwable t2) {
; // ignore
} finally {
try {
txn.rollback();
} catch (Throwable t4) {
; // ignore
}
}
{code}
* Why not use a LinkedBlockingDeque or something like that here:
{code:title=org.apache.flume.sink.solr.morphline.MorphlineInterceptor.LocalMorphlineInterceptor}
private final List<LocalMorphlineInterceptor> pool = new ArrayList();
{code}
Also we might want to make sure we only spawn a maximum number of the
LocalMorphlineInterceptors?
* This array can be pre-allocated and reused no?
{code:title=/org/apache/flume/sink/solr/morphline/BlobDeserializer.java:77}
byte[] buf = new byte[Math.min(maxBlobLength, DEFAULT_BUFFER_SIZE)];
{code}
* This code seems to be repeated in BlobHandler and BlobDeserializer, maybe
move it to a util class or something?:
{code}
ByteArrayOutputStream blob = null;
byte[] buf = new byte[Math.min(maxBlobLength, DEFAULT_BUFFER_SIZE)];
int blobLength = 0;
int n = 0;
while ((n = in.read(buf, 0, Math.min(buf.length, maxBlobLength -
blobLength))) != -1) {
if (blob == null) {
blob = new ByteArrayOutputStream(n);
}
blob.write(buf, 0, n);
blobLength += n;
if (blobLength >= maxBlobLength) {
LOGGER.warn("Request length exceeds maxBlobLength ({}), truncating
BLOB event!", maxBlobLength);
break;
}
}
{code}
Also, we should see if we can fix the build issue. Also, could you please
rebase on trunk?
> Add a Flume Morphline Solr Sink
> -------------------------------
>
> Key: FLUME-2070
> URL: https://issues.apache.org/jira/browse/FLUME-2070
> Project: Flume
> Issue Type: Improvement
> Components: Sinks+Sources
> Affects Versions: v1.3.1
> Reporter: wolfgang hoschek
> Fix For: v1.4.0
>
> Attachments: FLUME-2070-v1.patch, FLUME-2070-v2.patch
>
>
> Add a Flume Morphline Solr Sink that extracts search documents from Flume
> events, transforms them with a morphline and loads them in Near Real Time
> into Apache Solr, typically a SolrCloud.
> The sink is intended to be used alongside the HdfsSink. It is designed to
> extract, transform and load any data in flexible ways, not just structured
> data, but also arbitrary raw data, including data from many heterogeneous
> data sources.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira