[ 
https://issues.apache.org/jira/browse/SOLR-9240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15371958#comment-15371958
 ] 

Joel Bernstein edited comment on SOLR-9240 at 7/12/16 1:17 AM:
---------------------------------------------------------------

This ticket is looking fairly good. I did a round of manual testing with the 
expression below which worked as expected.

{code}
parallel(
         workerCollection, 
         workers="2", 
         sort="_version_ desc", 
         daemon(
                  update(
                        updateCollection, 
                        batchSize=200, 
                        topic(
                            checkpointCollection,
                            topicCollection, 
                            q=*:*, 
                             id="topic40",
                             fl="id, to , from", 
                             partitionKeys="id",
                             initialCheckpoint="0")), 
               runInterval="1000", 
               id="test3"))
{code}

This expression sends a daemon expression to two worker nodes. The daemon is 
wrapping an update expression which is wrapping a topic() expression. The topic 
has the new  *initialCheckpoint* parameter so it starts pulling records from 
checkpoint 0, which includes every record that matches the topic query in the 
index. The topic also has the *partitionKeys* parameter so each worker pulls a 
partition of records that match the topic query.

The daemon function will run the update() function iteratively. Each run will 
update the topic checkpoints for each worker.

The effect of this is that each worker will iterate though it's partition of 
the topic query, reindexing all the records that match the topic in another 
collection.



was (Author: joel.bernstein):
This ticket is looking fairly good. I did a round of manual testing with the 
expression below which worked as expected.

{code}
parallel(
         workerCollection, 
         workers="2", 
         sort="_version_ desc", 
         daemon(
                  update(
                        updateCollection, 
                        batchSize=200, 
                        topic(
                            checkpointCollection,
                            topicCollection, 
                            q=*:*, 
                             id="topic40",
                             fl="id, to , from", 
                             partitionKeys="id",
                             initialCheckpoint="0")), 
               runInterval="1000", 
               id="test3"))
{code}

What this expression does is send a daemon expression to two worker nodes. The 
daemon is wrapping an update expression which is wrapping a topic() expression. 
The topic has the new initialCheckpoint parameter so it starts pulling records 
from checkpoint 0, which includes every record that matches the topic query in 
the index. The topic also has the partitionKeys parameter set so each worker 
pulls a partition of records that match the topic query.

The daemon function will run the update() function iteratively. Each run will 
update the topic checkpoints for each worker.

The effect of this is that each worker will iterate though it's partition of 
the topic query, reindexing all the records that match the topic in another 
collection.


> Support running the topic() Streaming Expression in parallel mode.
> ------------------------------------------------------------------
>
>                 Key: SOLR-9240
>                 URL: https://issues.apache.org/jira/browse/SOLR-9240
>             Project: Solr
>          Issue Type: Improvement
>            Reporter: Joel Bernstein
>            Assignee: Joel Bernstein
>         Attachments: SOLR-9240.patch, SOLR-9240.patch
>
>
> Currently the topic() function won't run in parallel mode because each worker 
> needs to maintain a separate set of checkpoints. The proposed solution for 
> this is to append the worker ID to the topic ID, which will cause each worker 
> to have it's own checkpoints.
> It would be useful to support parallelizing the topic function because it 
> will provide a general purpose approach for processing text in parallel 
> across worker nodes.
> For example this would allow a classify() function to be wrapped around a 
> topic() function to classify documents in parallel across worker nodes. 
> Sample syntax:
> {code}
> parallel(daemon(update(classify(topic(..., partitionKeys="id")))))
> {code}
> The example above would send a daemon to worker nodes that would classify all 
> documents returned by the topic() function. The update function would send 
> the output of classify() to a SolrCloud collection for indexing.
> The partitionKeys parameter would ensure that each worker would receive a 
> partition of the results returned by the topic() function. This allows the 
> classify() function to be run in parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to