[ 
https://issues.apache.org/jira/browse/STORM-1214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15108316#comment-15108316
 ] 

ASF GitHub Bot commented on STORM-1214:
---------------------------------------

Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1029#discussion_r50234893
  
    --- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
    @@ -68,61 +85,160 @@ protected Stream(TridentTopology topology, String 
name, Node node) {
             _node = node;
             _name = name;
         }
    -    
    +
    +    /**
    +     * Applies a label to the stream. Naming a stream will append the 
label to the name of the bolt(s) created by
    +     * Trident and will be visible in the Storm UI.
    +     *
    +     * @param name - The label to apply to the stream
    +     * @return
    +     */
         public Stream name(String name) {
             return new Stream(_topology, name, _node);
         }
    -    
    +
    +    /**
    +     * Applies a parallelism hint to a stream.
    +     *
    +     * @param hint
    +     * @return
    +     */
         public Stream parallelismHint(int hint) {
             _node.parallelismHint = hint;
             return this;
         }
    -        
    +
    +    /**
    +     * Filters out fields from a stream, resulting in a Stream containing 
only the fields specified by `keepFields`.
    +     *
    +     * For example, if you had a Stream `mystream` containing the fields 
`["a", "b", "c","d"]`, calling"
    +     *
    +     * ```java
    +     * mystream.project(new Fields("b", "d"))
    +     * ```
    +     *
    +     * would produce a stream containing only the fields `["b", "d"]`.
    +     *
    +     *
    +     * @param keepFields The fields in the Stream to keep
    +     * @return
    +     */
         public Stream project(Fields keepFields) {
             projectionValidation(keepFields);
             return _topology.addSourcedNode(this, new 
ProcessorNode(_topology.getUniqueStreamId(), _name, keepFields, new Fields(), 
new ProjectedProcessor(keepFields)));
         }
     
    +    /**
    +     * ## Grouping Operation
    +     *
    +     * @param fields
    +     * @return
    +     */
         public GroupedStream groupBy(Fields fields) {
             projectionValidation(fields);
             return new GroupedStream(this, fields);        
         }
    -    
    +
    +    /**
    +     * ## Repartitioning Operation
    +     *
    +     * @param fields
    +     * @return
    +     */
         public Stream partitionBy(Fields fields) {
             projectionValidation(fields);
             return partition(Grouping.fields(fields.toList()));
         }
    -    
    +
    +    /**
    +     * ## Repartitioning Operation
    +     *
    +     * @param partitioner
    +     * @return
    +     */
         public Stream partition(CustomStreamGrouping partitioner) {
             return 
partition(Grouping.custom_serialized(Utils.javaSerialize(partitioner)));
         }
    -    
    +
    +    /**
    +     * ## Repartitioning Operation
    +     *
    +     * Use random round robin algorithm to evenly redistribute tuples 
across all target partitions
    +     *
    +     * @return
    +     */
         public Stream shuffle() {
             return partition(Grouping.shuffle(new NullStruct()));
         }
     
    +    /**
    +     * ## Repartitioning Operation
    +     *
    +     * Use random round robin algorithm to evenly redistribute tuples 
across all target partitions, with a preference
    +     * for local tasks.
    +     *
    +     * @return
    +     */
         public Stream localOrShuffle() {
             return partition(Grouping.local_or_shuffle(new NullStruct()));
         }
    +
    +
    +    /**
    +     * ## Repartitioning Operation
    +     *
    +     * All tuples are sent to the same partition. The same partition is 
chosen for all batches in the stream.
    +     * @return
    +     */
         public Stream global() {
             // use this instead of storm's built in one so that we can specify 
a singleemitbatchtopartition
             // without knowledge of storm's internals
             return partition(new GlobalGrouping());
         }
    -    
    +
    +    /**
    +     * ## Repartitioning Operation
    +     *
    +     *  All tuples in the batch are sent to the same partition. Different 
batches in the stream may go to different
    +     *  partitions.
    +     *
    +     * @return
    +     */
         public Stream batchGlobal() {
             // the first field is the batch id
             return partition(new IndexHashGrouping(0));
         }
    -        
    +
    +    /**
    +     * ## Repartitioning Operation
    +     *
    +     * Every tuple is replicated to all target partitions. This can useful 
during DRPC – for example, if you need to do
    +     * a stateQuery on every partition of data.
    +     *
    +     * @return
    +     */
         public Stream broadcast() {
             return partition(Grouping.all(new NullStruct()));
         }
    -    
    +
    +    /**
    +     * ## Repartitioning Operation
    +     *
    +     * @return
    +     */
         public Stream identityPartition() {
             return partition(new IdentityGrouping());
         }
    -    
    +
    +    /**
    +     * ## Repartitioning Operation
    +     *
    +     * This method takes in a custom partitioning function that implements
    +     * {@link backtype.storm.grouping.CustomStreamGrouping}
    --- End diff --
    
    It should be org.apache.storm.grouping.CustomStreamGrouping


> Trident API Improvements
> ------------------------
>
>                 Key: STORM-1214
>                 URL: https://issues.apache.org/jira/browse/STORM-1214
>             Project: Apache Storm
>          Issue Type: Bug
>            Reporter: P. Taylor Goetz
>            Assignee: P. Taylor Goetz
>
> There are a few idiosyncrasies in the Trident API that can sometimes trip 
> developers up (e.g. when and how to set the parallelism of components). There 
> are also a few areas where the API could be made slightly more intuitive 
> (e.g. add Java 8 streams-like methods like {{filter()}}, {{map()}}, 
> {{flatMap()}}, etc.).
> Some of these concerns can be addressed through documentation, and some by 
> altering the API. Since we are approaching a 1.0 release, it would be good to 
> address any API changes before a major release.
> The goal of this JIRA is to identify specific areas of improvement and 
> formulate an implementation that addresses them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to