[
https://issues.apache.org/jira/browse/PIG-3453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jacob Perkins updated PIG-3453:
-------------------------------
Fix Version/s: 0.13.0
Assignee: Jacob Perkins
Affects Version/s: 0.13.0
Status: Patch Available (was: Open)
Whew. Here's a patch that demonstrates running, e2e, a word count. It's quite
hefty so here's some high level points:
* Implemented two new operators 'tap' and 'sink' with corresponding logical
operators LOTap and LOSink and interfaces SinkFunc and TapFunc. I did the best
I could to keep them general enough to work beyond the scope of simply storm.
It may make sense to split just this part out into it's own jira&patch.
* Implemented LocalFileTap and LocalFileSink (which really shouldn't be used
for more than simple testing) to demonstrate the TapFunc and SinkFunc.
* LogToTopologyTranslationVisitor - Much like LogToPhyTranslationVisitor for
the physical plan, it walks the logical plan and creates a TridentTopology.
* LOForEach - I more or less copied exactly what's being done in the
LogToPhyTranslationVisitor. Since POForEach is serializable, rather than
parsing the logical expression plans myself I simple create the POForEach and
wrap it with a storm trident BaseFunction. It seemed a reasonably pragmatic
approach for now.
* LOCogroup - I took a similar approach to LoForEach except, since POPackage is
tied so closely with Hadoop Writables I implemented something similar to what
POPackage is doing with StreamPackageFunction
* TridentExecutionEngine - This is probably the hackiest part. I'm not sure
what the best way to create a stats object for this is. The topology runs
continuously, it doesn't 'succeed'. I don't want to fake POStores.
* Building and classpath. I did the best I could to not have a dependency
nightmare scenario. After applying the patch to trunk it should build fine. To
run you'll want zookeeper-3.3.3.jar (no other version works) and
storm-core-0.9.0-rc2.jar in your class path.
* test script:
{code: title=wordcount.pig|borderStyle=solid}
set storm.executionengine.stream.batch.size 10000
data = tap '$sometext' using
org.apache.pig.backend.storm.tap.LocalFileTap('line') as (line:chararray);
tokens = foreach data generate flatten(TOKENIZE(line)) as (token:chararray);
counts = foreach (group tokens by token) generate
group as token,
COUNT(tokens) as num;
sink counts into '$output' using
org.apache.pig.backend.storm.sink.LocalFileSink('token');
{code}
I'm sure there's more details than this. Again it's a large patch and, rather
than continuing to polish it, I think it's time for feedback.
> Implement a Storm backend to Pig
> --------------------------------
>
> Key: PIG-3453
> URL: https://issues.apache.org/jira/browse/PIG-3453
> Project: Pig
> Issue Type: New Feature
> Affects Versions: 0.13.0
> Reporter: Pradeep Gollakota
> Assignee: Jacob Perkins
> Labels: storm
> Fix For: 0.13.0
>
> Attachments: storm-integration.patch
>
>
> There is a lot of interest around implementing a Storm backend to Pig for
> streaming processing. The proposal and initial discussions can be found at
> https://cwiki.apache.org/confluence/display/PIG/Pig+on+Storm+Proposal
--
This message was sent by Atlassian JIRA
(v6.1#6144)