[ 
https://issues.apache.org/jira/browse/STORM-1271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15211827#comment-15211827
 ] 

ASF GitHub Bot commented on STORM-1271:
---------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1249#discussion_r57445892
  
    --- Diff: storm-core/src/jvm/org/apache/storm/daemon/Grouper.java ---
    @@ -0,0 +1,111 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.daemon;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.Thrift;
    +import org.apache.storm.generated.GlobalStreamId;
    +import org.apache.storm.generated.Grouping;
    +import org.apache.storm.grouping.CustomStreamGrouping;
    +import org.apache.storm.grouping.LoadAwareShuffleGrouping;
    +import org.apache.storm.grouping.LoadMapping;
    +import org.apache.storm.grouping.ShuffleGrouping;
    +import org.apache.storm.task.WorkerTopologyContext;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.utils.TupleUtils;
    +
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +
    +public abstract class Grouper {
    +
    +    public abstract List<Integer> outTasks(Integer taskId, List<Object> 
values, LoadMapping load);
    +
    +    public static class FieldsGrouper extends Grouper {
    +
    +        private final Fields outFields;
    +        private final List<Integer> targetTasks;
    +        private final Fields groupFields;
    +        private final int numTasks;
    +
    +        public FieldsGrouper(Fields outFields, Grouping thriftGrouping,
    +                             List<Integer> targetTasks) {
    +            this.outFields = outFields;
    +            this.targetTasks = targetTasks;
    +            this.groupFields = new 
Fields(Thrift.fieldGrouping(thriftGrouping));
    +            this.numTasks = targetTasks.size();
    +        }
    +
    +        @Override
    +        public List<Integer> outTasks(Integer taskId, List<Object> values, 
LoadMapping load) {
    +            int targetTaskIndex = 
Math.abs(TupleUtils.listHashCode(outFields.select(groupFields, values))) % 
numTasks;
    +            return 
Collections.singletonList(targetTasks.get(targetTaskIndex));
    +        }
    +    }
    +
    +    public static class ShuffleGrouper extends Grouper {
    +
    +        private Grouper delegate;
    +
    +        public ShuffleGrouper(WorkerTopologyContext context, String 
componentId, String streamId,
    +                              List<Integer> targetTasks, Map topoConf) {
    +            if (null != 
topoConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING) && (boolean) topoConf
    +                .get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
    +                delegate = new CustomGrouper(new ShuffleGrouping(), 
context, componentId, streamId, targetTasks);
    +            } else {
    +                delegate = new LoadAwareCustomGrouper(new 
LoadAwareShuffleGrouping(), context, componentId, streamId, targetTasks);
    +            }
    +        }
    +
    +        @Override
    +        public List<Integer> outTasks(Integer taskId, List<Object> values, 
LoadMapping load) {
    +            return delegate.outTasks(taskId, values, load);
    +        }
    +    }
    +
    +    public static class CustomGrouper extends Grouper {
    +
    +        protected final CustomStreamGrouping customStreamGrouping;
    +
    +        public CustomGrouper(CustomStreamGrouping customStreamGrouping, 
WorkerTopologyContext context, String componentId, String streamId,
    +                             List<Integer> targetTasks) {
    +            this.customStreamGrouping = customStreamGrouping;
    +            this.customStreamGrouping.prepare(context, new 
GlobalStreamId(componentId, streamId), targetTasks);
    +        }
    +
    +        @Override
    +        public List<Integer> outTasks(Integer taskId, List<Object> values, 
LoadMapping load) {
    +            return customStreamGrouping.chooseTasks(taskId, values);
    +        }
    +    }
    +
    +    public static class LoadAwareCustomGrouper extends CustomGrouper {
    +
    +        public LoadAwareCustomGrouper(LoadAwareShuffleGrouping 
customStreamGrouping, WorkerTopologyContext context, String componentId,
    --- End diff --
    
    `LoadAwareShuffleGrouping` should be `LoadAwareCustomStreamGrouping` 
instead.


> port backtype.storm.daemon.task to java
> ---------------------------------------
>
>                 Key: STORM-1271
>                 URL: https://issues.apache.org/jira/browse/STORM-1271
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Abhishek Agarwal
>              Labels: java-migration, jstorm-merger
>
> helper functions for task data and sending tuples



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to