[
https://issues.apache.org/jira/browse/STORM-1271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15211853#comment-15211853
]
ASF GitHub Bot commented on STORM-1271:
---------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/1249#discussion_r57447253
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java ---
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+
+import org.apache.storm.Thrift;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.grouping.LoadMapping;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+public class GrouperFactory {
+
+ // A no-op grouper
+ public static Grouper DIRECT = new Grouper() {
+ @Override
+ public List<Integer> outTasks(Integer taskId, List<Object> values,
LoadMapping load) {
+ return null;
+ }
+ };
+
+ public static Grouper mkGrouper(WorkerTopologyContext context, String
componentId, String streamId, Fields outFields,
+ Grouping thriftGrouping,
+ List<Integer> unsortedTargetTasks,
+ Map topoConf) {
+ final Random random = new Random();
+ final List<Integer> targetTasks =
Ordering.natural().sortedCopy(unsortedTargetTasks);
+ final int numTasks = targetTasks.size();
+ switch (Thrift.groupingType(thriftGrouping)) {
+ case FIELDS:
+ if (Thrift.isGlobalGrouping(thriftGrouping)) {
+ return new Grouper() {
+ @Override
+ public List<Integer> outTasks(Integer taskId, List
value, LoadMapping load) {
+ if (targetTasks.isEmpty()) {
+ return null;
+ }
+ // It's possible for target to have multiple
tasks if it reads multiple sources
+ return
Collections.singletonList(targetTasks.get(0));
+ }
+ };
+ }
+ return new Grouper.FieldsGrouper(outFields,
thriftGrouping, targetTasks);
+ case SHUFFLE:
+ return new Grouper.ShuffleGrouper(context, componentId,
streamId, targetTasks, topoConf);
+ case ALL:
+ return new Grouper() {
+ @Override
+ public List<Integer> outTasks(Integer taskId,
List<Object> values, LoadMapping load) {
+ return targetTasks;
+ }
+ };
+ case LOCAL_OR_SHUFFLE:
+ Set<Integer> sameTasks =
Sets.intersection(Sets.newHashSet(targetTasks),
Sets.newHashSet(context.getThisWorkerTasks()));
+ if (sameTasks.isEmpty()) {
+ return new Grouper.ShuffleGrouper(context,
componentId, streamId, targetTasks, topoConf);
+ } else {
+ return new Grouper.ShuffleGrouper(context,
componentId, streamId, new ArrayList<>(sameTasks), topoConf);
+ }
+ case NONE:
+ return new Grouper() {
+ @Override
+ public List<Integer> outTasks(Integer taskId,
List<Object> values, LoadMapping load) {
+ int index = random.nextInt(numTasks);
+ return
Collections.singletonList(targetTasks.get(index));
+ }
+ };
+ case CUSTOM_OBJECT:
+ CustomStreamGrouping customObject =
+ (CustomStreamGrouping)
Thrift.instantiateJavaObject(thriftGrouping.get_custom_object());
+ return new Grouper.CustomGrouper(customObject, context,
componentId, streamId, targetTasks);
+ case CUSTOM_SERIALIZED:
+ CustomStreamGrouping
+ customSerialized =
+
Utils.javaDeserialize(thriftGrouping.get_custom_serialized(),
CustomStreamGrouping.class);
+ return new Grouper.CustomGrouper(customSerialized,
context, componentId, streamId, targetTasks);
--- End diff --
If someone created a CustormStreamGrouping that was also
LoadAwareCustomStreamGrouping the LoadAware portion would never be called,
because we are now using constructors instead of the builder pattern. This
really should be.
```
case CUSTOM_SERIALIZED:
CustomStreamGrouping
customGrouping =
Utils.javaDeserialize(thriftGrouping.get_custom_serialized(),
CustomStreamGrouping.class);
if (customGrouping instanceof CustomLoadAwareStreamGrouping) {
return new Grouper.LoadAwareCustomGrouper(customSerialized, context,
componentId, streamId, targetTasks);
}
return new Grouper.CustomGrouper(customSerialized, context,
componentId, streamId, targetTasks);
```
> port backtype.storm.daemon.task to java
> ---------------------------------------
>
> Key: STORM-1271
> URL: https://issues.apache.org/jira/browse/STORM-1271
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Abhishek Agarwal
> Labels: java-migration, jstorm-merger
>
> helper functions for task data and sending tuples
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)