[ 
https://issues.apache.org/jira/browse/STORM-1188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16306326#comment-16306326
 ] 

Dan Blanchard commented on STORM-1188:
--------------------------------------

[~kabhwan]

I can't currently define a topology as a PartialKeyGrouping outside of the JVM, 
but I would be able to if PartialKeyGrouping had an additional constructor that 
took a list of strings instead of a list of Fields. If such a constructor were 
available, I'd be able to use a PartialKeyGrouping via the [streamparse 
topology 
DSL|http://streamparse.readthedocs.io/en/stable/topologies.html#topology-dsl] 
like:

{code:none}
from collections import Counter
from itertools import cycle

from streamparse import Bolt, Grouping, Spout, Topology


# In real life, this would go in another file for cleanliness
class WordSpout(Spout):
    outputs = ['word']

    def initialize(self, stormconf, context):
        self.words = cycle(['dog', 'cat', 'zebra', 'elephant'])

    def next_tuple(self):
        word = next(self.words)
        self.emit([word])


# In real life, this would go in another file for cleanliness
class WordCountBolt(Bolt):
    outputs = ['word', 'count']

    def initialize(self, conf, ctx):
        self.counter = Counter()
        self.total = 0

    def process(self, tup):
        word = tup.values.word
        # Increment only dog by 10 so it is more obvious this works
        inc_by = 10 if word == "dog" else 1
        self.counter[word] += inc_by
        self.total += inc_by
        if self.total % 1000 == 0:
            self.logger.info("counted %i words", self.total)
        self.emit([word, self.counter[word]])


class PartialKeyTestTopo(Topology):
    word_spout = WordSpout.spec(par=5)
    count_bolt = WordCountBolt.spec(inputs={word_spout: 
                                            
Grouping.custom_object('org.apache.storm.grouping.PartialKeyGrouping',
                                                                   ['word'])})
{code}

With that additional constructor, no changes to the Thrift layer would be 
needed at all.

> PartialKeyGrouping missing from storm.thrift (and can't use it via 
> custom_object)
> ---------------------------------------------------------------------------------
>
>                 Key: STORM-1188
>                 URL: https://issues.apache.org/jira/browse/STORM-1188
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-core
>    Affects Versions: 0.10.0, 1.0.0, 0.10.1, 1.0.1, 0.10.2, 1.0.2, 1.1.0, 
> 1.0.3, 1.x, 0.10.3, 1.0.4, 1.1.1, 1.0.5
>            Reporter: Dan Blanchard
>
> I'm working on a Python DSL for Storm to add to streamparse, and as part of 
> it I realized that the new partial key grouping was never added to the 
> Grouping struct in storm.thrift, so it's not usable outside of JVM-based 
> topology definitions (at least not easily).  My initial thought was to just 
> use Grouping.custom_object, but the PartialKeyGrouping constructor takes a 
> Fields object, which isn't a type defined in storm.thrift, so I can't use it.
> The fields grouping explicitly takes a list of strings in storm.thrift, so it 
> would seem PartialKeyGrouping needs to be added in the same way.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to