[
https://issues.apache.org/jira/browse/STORM-1271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15211827#comment-15211827
]
ASF GitHub Bot commented on STORM-1271:
---------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/1249#discussion_r57445892
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/Grouper.java ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import org.apache.storm.Config;
+import org.apache.storm.Thrift;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.grouping.CustomStreamGrouping;
+import org.apache.storm.grouping.LoadAwareShuffleGrouping;
+import org.apache.storm.grouping.LoadMapping;
+import org.apache.storm.grouping.ShuffleGrouping;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.TupleUtils;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public abstract class Grouper {
+
+ public abstract List<Integer> outTasks(Integer taskId, List<Object>
values, LoadMapping load);
+
+ public static class FieldsGrouper extends Grouper {
+
+ private final Fields outFields;
+ private final List<Integer> targetTasks;
+ private final Fields groupFields;
+ private final int numTasks;
+
+ public FieldsGrouper(Fields outFields, Grouping thriftGrouping,
+ List<Integer> targetTasks) {
+ this.outFields = outFields;
+ this.targetTasks = targetTasks;
+ this.groupFields = new
Fields(Thrift.fieldGrouping(thriftGrouping));
+ this.numTasks = targetTasks.size();
+ }
+
+ @Override
+ public List<Integer> outTasks(Integer taskId, List<Object> values,
LoadMapping load) {
+ int targetTaskIndex =
Math.abs(TupleUtils.listHashCode(outFields.select(groupFields, values))) %
numTasks;
+ return
Collections.singletonList(targetTasks.get(targetTaskIndex));
+ }
+ }
+
+ public static class ShuffleGrouper extends Grouper {
+
+ private Grouper delegate;
+
+ public ShuffleGrouper(WorkerTopologyContext context, String
componentId, String streamId,
+ List<Integer> targetTasks, Map topoConf) {
+ if (null !=
topoConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING) && (boolean) topoConf
+ .get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
+ delegate = new CustomGrouper(new ShuffleGrouping(),
context, componentId, streamId, targetTasks);
+ } else {
+ delegate = new LoadAwareCustomGrouper(new
LoadAwareShuffleGrouping(), context, componentId, streamId, targetTasks);
+ }
+ }
+
+ @Override
+ public List<Integer> outTasks(Integer taskId, List<Object> values,
LoadMapping load) {
+ return delegate.outTasks(taskId, values, load);
+ }
+ }
+
+ public static class CustomGrouper extends Grouper {
+
+ protected final CustomStreamGrouping customStreamGrouping;
+
+ public CustomGrouper(CustomStreamGrouping customStreamGrouping,
WorkerTopologyContext context, String componentId, String streamId,
+ List<Integer> targetTasks) {
+ this.customStreamGrouping = customStreamGrouping;
+ this.customStreamGrouping.prepare(context, new
GlobalStreamId(componentId, streamId), targetTasks);
+ }
+
+ @Override
+ public List<Integer> outTasks(Integer taskId, List<Object> values,
LoadMapping load) {
+ return customStreamGrouping.chooseTasks(taskId, values);
+ }
+ }
+
+ public static class LoadAwareCustomGrouper extends CustomGrouper {
+
+ public LoadAwareCustomGrouper(LoadAwareShuffleGrouping
customStreamGrouping, WorkerTopologyContext context, String componentId,
--- End diff --
`LoadAwareShuffleGrouping` should be `LoadAwareCustomStreamGrouping`
instead.
> port backtype.storm.daemon.task to java
> ---------------------------------------
>
> Key: STORM-1271
> URL: https://issues.apache.org/jira/browse/STORM-1271
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Abhishek Agarwal
> Labels: java-migration, jstorm-merger
>
> helper functions for task data and sending tuples
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)