[ 
https://issues.apache.org/jira/browse/PIG-3453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Perkins updated PIG-3453:
-------------------------------

        Fix Version/s: 0.13.0
             Assignee: Jacob Perkins
    Affects Version/s: 0.13.0
               Status: Patch Available  (was: Open)

Whew. Here's a patch that demonstrates running, e2e, a word count. It's quite 
hefty so here's some high level points:

* Implemented two new operators 'tap' and 'sink' with corresponding logical 
operators LOTap and LOSink and interfaces SinkFunc and TapFunc. I did the best 
I could to keep them general enough to work beyond the scope of simply storm. 
It may make sense to split just this part out into it's own jira&patch.

* Implemented LocalFileTap and LocalFileSink (which really shouldn't be used 
for more than simple testing) to demonstrate the TapFunc and SinkFunc.

* LogToTopologyTranslationVisitor - Much like LogToPhyTranslationVisitor for 
the physical plan, it walks the logical plan and creates a TridentTopology.

* LOForEach - I more or less copied exactly what's being done in the 
LogToPhyTranslationVisitor. Since POForEach is serializable, rather than 
parsing the logical expression plans myself I simple create the POForEach and 
wrap it with a storm trident BaseFunction. It seemed a reasonably pragmatic 
approach for now.

* LOCogroup - I took a similar approach to LoForEach except, since POPackage is 
tied so closely with Hadoop Writables I implemented something similar to what 
POPackage is doing with StreamPackageFunction

* TridentExecutionEngine - This is probably the hackiest part. I'm not sure 
what the best way to create a stats object for this is. The topology runs 
continuously, it doesn't 'succeed'. I don't want to fake POStores.

* Building and classpath. I did the best I could to not have a dependency 
nightmare scenario. After applying the patch to trunk it should build fine. To 
run you'll want zookeeper-3.3.3.jar (no other version works) and 
storm-core-0.9.0-rc2.jar in your class path.

* test script:

{code: title=wordcount.pig|borderStyle=solid}
set storm.executionengine.stream.batch.size 10000

data = tap '$sometext' using 
org.apache.pig.backend.storm.tap.LocalFileTap('line') as (line:chararray);

tokens = foreach data generate flatten(TOKENIZE(line)) as (token:chararray);

counts = foreach (group tokens by token) generate
                 group as token,
                 COUNT(tokens) as num;

sink counts into '$output' using 
org.apache.pig.backend.storm.sink.LocalFileSink('token');
{code}

I'm sure there's more details than this. Again it's a large patch and, rather 
than continuing to polish it, I think it's time for feedback.

> Implement a Storm backend to Pig
> --------------------------------
>
>                 Key: PIG-3453
>                 URL: https://issues.apache.org/jira/browse/PIG-3453
>             Project: Pig
>          Issue Type: New Feature
>    Affects Versions: 0.13.0
>            Reporter: Pradeep Gollakota
>            Assignee: Jacob Perkins
>              Labels: storm
>             Fix For: 0.13.0
>
>         Attachments: storm-integration.patch
>
>
> There is a lot of interest around implementing a Storm backend to Pig for 
> streaming processing. The proposal and initial discussions can be found at 
> https://cwiki.apache.org/confluence/display/PIG/Pig+on+Storm+Proposal



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to