[ 
https://issues.apache.org/jira/browse/STORM-1271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15211863#comment-15211863
 ] 

ASF GitHub Bot commented on STORM-1271:
---------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1249#discussion_r57447354
  
    --- Diff: storm-core/src/jvm/org/apache/storm/daemon/GrouperFactory.java ---
    @@ -0,0 +1,110 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.daemon;
    +
    +import com.google.common.collect.Ordering;
    +import com.google.common.collect.Sets;
    +
    +import org.apache.storm.Thrift;
    +import org.apache.storm.generated.Grouping;
    +import org.apache.storm.grouping.CustomStreamGrouping;
    +import org.apache.storm.grouping.LoadMapping;
    +import org.apache.storm.task.WorkerTopologyContext;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.utils.Utils;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Random;
    +import java.util.Set;
    +
    +public class GrouperFactory {
    +
    +    // A no-op grouper
    +    public static Grouper DIRECT = new Grouper() {
    +        @Override
    +        public List<Integer> outTasks(Integer taskId, List<Object> values, 
LoadMapping load) {
    +            return null;
    +        }
    +    };
    +
    +    public static Grouper mkGrouper(WorkerTopologyContext context, String 
componentId, String streamId, Fields outFields,
    +                                    Grouping thriftGrouping,
    +                                    List<Integer> unsortedTargetTasks,
    +                                    Map topoConf) {
    +        final Random random = new Random();
    +        final List<Integer> targetTasks = 
Ordering.natural().sortedCopy(unsortedTargetTasks);
    +        final int numTasks = targetTasks.size();
    +        switch (Thrift.groupingType(thriftGrouping)) {
    +            case FIELDS:
    +                if (Thrift.isGlobalGrouping(thriftGrouping)) {
    +                    return new Grouper() {
    +                        @Override
    +                        public List<Integer> outTasks(Integer taskId, List 
value, LoadMapping load) {
    +                            if (targetTasks.isEmpty()) {
    +                                return null;
    +                            }
    +                            // It's possible for target to have multiple 
tasks if it reads multiple sources
    +                            return 
Collections.singletonList(targetTasks.get(0));
    +                        }
    +                    };
    +                }
    +                return new Grouper.FieldsGrouper(outFields, 
thriftGrouping, targetTasks);
    +            case SHUFFLE:
    +                return new Grouper.ShuffleGrouper(context, componentId, 
streamId, targetTasks, topoConf);
    +            case ALL:
    +                return new Grouper() {
    +                    @Override
    +                    public List<Integer> outTasks(Integer taskId, 
List<Object> values, LoadMapping load) {
    +                        return targetTasks;
    +                    }
    +                };
    +            case LOCAL_OR_SHUFFLE:
    +                Set<Integer> sameTasks = 
Sets.intersection(Sets.newHashSet(targetTasks), 
Sets.newHashSet(context.getThisWorkerTasks()));
    +                if (sameTasks.isEmpty()) {
    +                    return new Grouper.ShuffleGrouper(context, 
componentId, streamId, targetTasks, topoConf);
    +                } else {
    +                    return new Grouper.ShuffleGrouper(context, 
componentId, streamId, new ArrayList<>(sameTasks), topoConf);
    +                }
    +            case NONE:
    +                return new Grouper() {
    +                    @Override
    +                    public List<Integer> outTasks(Integer taskId, 
List<Object> values, LoadMapping load) {
    +                        int index = random.nextInt(numTasks);
    +                        return 
Collections.singletonList(targetTasks.get(index));
    +                    }
    +                };
    +            case CUSTOM_OBJECT:
    +                CustomStreamGrouping customObject =
    +                    (CustomStreamGrouping) 
Thrift.instantiateJavaObject(thriftGrouping.get_custom_object());
    +                return new Grouper.CustomGrouper(customObject, context, 
componentId, streamId, targetTasks);
    --- End diff --
    
    If someone created a CustormStreamGrouping that was also 
LoadAwareCustomStreamGrouping the LoadAware portion would never be called, 
because we are now using constructors instead of the builder pattern.  This 
really should be.
    
    ```
    case CUSTOM_OBJECT:
      CustomStreamGrouping
        customGrouping =
          (CustomStreamGrouping) 
Thrift.instantiateJavaObject(thriftGrouping.get_custom_object());
        if (customGrouping instanceof CustomLoadAwareStreamGrouping) {
          return new Grouper.LoadAwareCustomGrouper(customSerialized, context, 
componentId, streamId, targetTasks);
        }
        return new Grouper.CustomGrouper(customSerialized, context, 
componentId, streamId, targetTasks);
    ```


> port backtype.storm.daemon.task to java
> ---------------------------------------
>
>                 Key: STORM-1271
>                 URL: https://issues.apache.org/jira/browse/STORM-1271
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Abhishek Agarwal
>              Labels: java-migration, jstorm-merger
>
> helper functions for task data and sending tuples



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to