[
https://issues.apache.org/jira/browse/STORM-1214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15108316#comment-15108316
]
ASF GitHub Bot commented on STORM-1214:
---------------------------------------
Github user satishd commented on a diff in the pull request:
https://github.com/apache/storm/pull/1029#discussion_r50234893
--- Diff: storm-core/src/jvm/org/apache/storm/trident/Stream.java ---
@@ -68,61 +85,160 @@ protected Stream(TridentTopology topology, String
name, Node node) {
_node = node;
_name = name;
}
-
+
+ /**
+ * Applies a label to the stream. Naming a stream will append the
label to the name of the bolt(s) created by
+ * Trident and will be visible in the Storm UI.
+ *
+ * @param name - The label to apply to the stream
+ * @return
+ */
public Stream name(String name) {
return new Stream(_topology, name, _node);
}
-
+
+ /**
+ * Applies a parallelism hint to a stream.
+ *
+ * @param hint
+ * @return
+ */
public Stream parallelismHint(int hint) {
_node.parallelismHint = hint;
return this;
}
-
+
+ /**
+ * Filters out fields from a stream, resulting in a Stream containing
only the fields specified by `keepFields`.
+ *
+ * For example, if you had a Stream `mystream` containing the fields
`["a", "b", "c","d"]`, calling"
+ *
+ * ```java
+ * mystream.project(new Fields("b", "d"))
+ * ```
+ *
+ * would produce a stream containing only the fields `["b", "d"]`.
+ *
+ *
+ * @param keepFields The fields in the Stream to keep
+ * @return
+ */
public Stream project(Fields keepFields) {
projectionValidation(keepFields);
return _topology.addSourcedNode(this, new
ProcessorNode(_topology.getUniqueStreamId(), _name, keepFields, new Fields(),
new ProjectedProcessor(keepFields)));
}
+ /**
+ * ## Grouping Operation
+ *
+ * @param fields
+ * @return
+ */
public GroupedStream groupBy(Fields fields) {
projectionValidation(fields);
return new GroupedStream(this, fields);
}
-
+
+ /**
+ * ## Repartitioning Operation
+ *
+ * @param fields
+ * @return
+ */
public Stream partitionBy(Fields fields) {
projectionValidation(fields);
return partition(Grouping.fields(fields.toList()));
}
-
+
+ /**
+ * ## Repartitioning Operation
+ *
+ * @param partitioner
+ * @return
+ */
public Stream partition(CustomStreamGrouping partitioner) {
return
partition(Grouping.custom_serialized(Utils.javaSerialize(partitioner)));
}
-
+
+ /**
+ * ## Repartitioning Operation
+ *
+ * Use random round robin algorithm to evenly redistribute tuples
across all target partitions
+ *
+ * @return
+ */
public Stream shuffle() {
return partition(Grouping.shuffle(new NullStruct()));
}
+ /**
+ * ## Repartitioning Operation
+ *
+ * Use random round robin algorithm to evenly redistribute tuples
across all target partitions, with a preference
+ * for local tasks.
+ *
+ * @return
+ */
public Stream localOrShuffle() {
return partition(Grouping.local_or_shuffle(new NullStruct()));
}
+
+
+ /**
+ * ## Repartitioning Operation
+ *
+ * All tuples are sent to the same partition. The same partition is
chosen for all batches in the stream.
+ * @return
+ */
public Stream global() {
// use this instead of storm's built in one so that we can specify
a singleemitbatchtopartition
// without knowledge of storm's internals
return partition(new GlobalGrouping());
}
-
+
+ /**
+ * ## Repartitioning Operation
+ *
+ * All tuples in the batch are sent to the same partition. Different
batches in the stream may go to different
+ * partitions.
+ *
+ * @return
+ */
public Stream batchGlobal() {
// the first field is the batch id
return partition(new IndexHashGrouping(0));
}
-
+
+ /**
+ * ## Repartitioning Operation
+ *
+ * Every tuple is replicated to all target partitions. This can useful
during DRPC – for example, if you need to do
+ * a stateQuery on every partition of data.
+ *
+ * @return
+ */
public Stream broadcast() {
return partition(Grouping.all(new NullStruct()));
}
-
+
+ /**
+ * ## Repartitioning Operation
+ *
+ * @return
+ */
public Stream identityPartition() {
return partition(new IdentityGrouping());
}
-
+
+ /**
+ * ## Repartitioning Operation
+ *
+ * This method takes in a custom partitioning function that implements
+ * {@link backtype.storm.grouping.CustomStreamGrouping}
--- End diff --
It should be org.apache.storm.grouping.CustomStreamGrouping
> Trident API Improvements
> ------------------------
>
> Key: STORM-1214
> URL: https://issues.apache.org/jira/browse/STORM-1214
> Project: Apache Storm
> Issue Type: Bug
> Reporter: P. Taylor Goetz
> Assignee: P. Taylor Goetz
>
> There are a few idiosyncrasies in the Trident API that can sometimes trip
> developers up (e.g. when and how to set the parallelism of components). There
> are also a few areas where the API could be made slightly more intuitive
> (e.g. add Java 8 streams-like methods like {{filter()}}, {{map()}},
> {{flatMap()}}, etc.).
> Some of these concerns can be addressed through documentation, and some by
> altering the API. Since we are approaching a 1.0 release, it would be good to
> address any API changes before a major release.
> The goal of this JIRA is to identify specific areas of improvement and
> formulate an implementation that addresses them.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)