[ 
https://issues.apache.org/jira/browse/STORM-1271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15211875#comment-15211875
 ] 

ASF GitHub Bot commented on STORM-1271:
---------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1249#discussion_r57447877
  
    --- Diff: storm-core/src/jvm/org/apache/storm/daemon/Task.java ---
    @@ -0,0 +1,246 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.daemon;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.Thrift;
    +import org.apache.storm.daemon.metrics.BuiltinMetrics;
    +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
    +import org.apache.storm.generated.Bolt;
    +import org.apache.storm.generated.ComponentObject;
    +import org.apache.storm.generated.JavaObject;
    +import org.apache.storm.generated.ShellComponent;
    +import org.apache.storm.generated.SpoutSpec;
    +import org.apache.storm.generated.StateSpoutSpec;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.grouping.LoadMapping;
    +import org.apache.storm.hooks.ITaskHook;
    +import org.apache.storm.hooks.info.EmitInfo;
    +import org.apache.storm.metric.api.IMetric;
    +import org.apache.storm.spout.ShellSpout;
    +import org.apache.storm.stats.CommonStats;
    +import org.apache.storm.task.ShellBolt;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.task.WorkerTopologyContext;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.tuple.TupleImpl;
    +import org.apache.storm.utils.ConfigUtils;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.Callable;
    +
    +public class Task {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
    +
    +    private Map executorData;
    +    private Map workerData;
    +    private TopologyContext systemTopologyContext;
    +    private TopologyContext userTopologyContext;
    +    private WorkerTopologyContext workerTopologyContext;
    +    private LoadMapping loadMapping;
    +    private Integer taskId;
    +    private String componentId;
    +    private Object taskObject;
    +    private Map stormConf;
    +    private Callable<Boolean> emitSampler;
    +    private CommonStats executorStats;
    +    private Map<String, Map<String, Grouper>> streamComponentToGrouper;
    +    private BuiltinMetrics builtInMetrics;
    +    private boolean debug;
    +
    +    public Task(Map executorData, Integer taskId) throws IOException {
    +        this.taskId = taskId;
    +        this.executorData = executorData;
    +        this.workerData = (Map) executorData.get("worker");
    +        this.stormConf = (Map) executorData.get("storm-conf");
    +        this.componentId = (String) executorData.get("component-id");
    +        this.streamComponentToGrouper = (Map<String, Map<String, 
Grouper>>) executorData.get("stream->component->grouper");
    +        this.executorStats = (CommonStats) executorData.get("stats");
    +        this.builtInMetrics = BuiltinMetricsUtil.mkData((String) 
executorData.get("type"), this.executorStats);
    +        this.workerTopologyContext = (WorkerTopologyContext) 
executorData.get("worker-context");
    +        this.emitSampler = ConfigUtils.mkStatsSampler(stormConf);
    +        this.loadMapping = (LoadMapping) workerData.get("load-mapping");
    +        this.systemTopologyContext = 
mkTopologyContextBuilder((StormTopology) workerData.get("system-topology"));
    +        this.userTopologyContext = 
mkTopologyContextBuilder((StormTopology) workerData.get("topology"));
    +        this.taskObject = mkTaskObject();
    +        this.debug = stormConf.containsKey(Config.TOPOLOGY_DEBUG) && 
(Boolean) stormConf.get(Config.TOPOLOGY_DEBUG);
    +        this.addTaskHooks();
    +    }
    +
    +    public List<Integer> getOutgoingTasks(Integer outTaskId, String 
stream, List<Object> values) {
    +        if (debug) {
    +            LOG.info("Emitting direct: {}; {} {} {} ", outTaskId, 
componentId, stream, values);
    +        }
    +        String targetComponent = 
workerTopologyContext.getComponentId(outTaskId);
    +        Map<String, Grouper> componentGrouping = 
streamComponentToGrouper.get(stream);
    +        Grouper grouping = componentGrouping.get(targetComponent);
    +        if (null == grouping) {
    +            outTaskId = null;
    +        }
    +        if (grouping != null && grouping != GrouperFactory.DIRECT) {
    +            throw new IllegalArgumentException("Cannot emitDirect to a 
task expecting a regular grouping");
    +        }
    +        new EmitInfo(values, stream, taskId, 
Collections.singletonList(outTaskId)).applyOn(userTopologyContext);
    +        try {
    +            if (emitSampler.call()) {
    +                executorStats.emittedTuple(stream);
    +                if (null != outTaskId) {
    +                    executorStats.transferredTuples(stream, 1);
    +                }
    +            }
    +        } catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +        if (null != outTaskId) {
    +            return Collections.singletonList(outTaskId);
    +        }
    +        return null;
    +    }
    +
    +    public List<Integer> getOutgoingTasks(String stream, List<Object> 
values) {
    +        if (debug) {
    +            LOG.info("Emitting: {} {} {}", componentId, stream, values);
    +        }
    +        List<Integer> outTasks = new ArrayList<>();
    +        if (!streamComponentToGrouper.containsKey(stream)) {
    +            throw new IllegalArgumentException("Unknown stream ID: " + 
stream);
    +        }
    +        if (null != streamComponentToGrouper.get(stream)) {
    +            // null value for __system
    +            for (Grouper grouper : 
streamComponentToGrouper.get(stream).values()) {
    +                if (grouper == GrouperFactory.DIRECT) {
    +                    throw new IllegalArgumentException("Cannot do regular 
emit to direct stream");
    +                }
    +                List<Integer> compTasks = grouper.outTasks(taskId, values, 
loadMapping);
    +                outTasks.addAll(compTasks);
    +            }
    +        }
    +        new EmitInfo(values, stream, taskId, 
outTasks).applyOn(userTopologyContext);
    +        try {
    +            if (emitSampler.call()) {
    +                executorStats.emittedTuple(stream);
    +                executorStats.transferredTuples(stream, outTasks.size());
    +            }
    +        } catch (Exception e) {
    +            throw new RuntimeException(e);
    +        }
    +        return outTasks;
    +    }
    +
    +    public Tuple getTuple(String stream, List values) {
    +        return new TupleImpl(systemTopologyContext, values, 
systemTopologyContext.getThisTaskId(), stream);
    +    }
    +
    +    public Integer getTaskId() {
    +        return taskId;
    +    }
    +
    +    public String getComponentId() {
    +        return componentId;
    +    }
    +
    +    public TopologyContext getUserContext() throws IOException {
    +        return userTopologyContext;
    +    }
    +
    +    public Object getTaskObject() {
    +        return taskObject;
    +    }
    +
    +    public BuiltinMetrics getBuiltInMetrics() {
    +        return builtInMetrics;
    +    }
    +
    +    private TopologyContext mkTopologyContextBuilder(StormTopology 
topology) throws IOException {
    +        Map conf = (Map) workerData.get("conf");
    +        return new TopologyContext(
    +            topology,
    +            (Map) workerData.get("storm-conf"),
    +            (Map<Integer, String>) workerData.get("task->component"),
    +            (Map<String, List<Integer>>) 
workerData.get("component->sorted-tasks"),
    +            (Map<String, Map<String, Fields>>) 
workerData.get("component->stream->fields"),
    +            (String) workerData.get("storm-id"),
    +            
ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf,
 (String) workerData.get("storm-id"))),
    +            ConfigUtils.workerPidsRoot(conf, (String) 
workerData.get("worker-id")),
    +            taskId,
    +            (Integer) workerData.get("port"),
    +            (List<Integer>) workerData.get("task-ids"),
    +            (Map<String, Object>) 
workerData.get("default-shared-resources"),
    +            (Map<String, Object>) workerData.get("user-shared-resources"),
    +            (Map<String, Object>) executorData.get("shared-executor-data"),
    +            (Map<Integer, Map<Integer, Map<String, IMetric>>>) 
executorData.get("interval->task->metric-registry"),
    +            (clojure.lang.Atom) 
executorData.get("open-or-prepare-was-called?")
    --- End diff --
    
    I assume this will go to AtomicBoolean when Executor goes to java.


> port backtype.storm.daemon.task to java
> ---------------------------------------
>
>                 Key: STORM-1271
>                 URL: https://issues.apache.org/jira/browse/STORM-1271
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Abhishek Agarwal
>              Labels: java-migration, jstorm-merger
>
> helper functions for task data and sending tuples



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to