[ 
https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324397#comment-15324397
 ] 

ASF GitHub Bot commented on STORM-1277:
---------------------------------------

Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1445#discussion_r66607521
  
    --- Diff: storm-core/src/jvm/org/apache/storm/executor/BaseExecutor.java ---
    @@ -0,0 +1,153 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.executor;
    +
    +import com.google.common.collect.Lists;
    +import com.lmax.disruptor.EventHandler;
    +import java.net.UnknownHostException;
    +import org.apache.storm.Config;
    +import org.apache.storm.Constants;
    +import org.apache.storm.StormTimer;
    +import org.apache.storm.daemon.StormCommon;
    +import org.apache.storm.daemon.Task;
    +import org.apache.storm.executor.error.IReportError;
    +import org.apache.storm.metric.api.IMetric;
    +import org.apache.storm.metric.api.IMetricsConsumer;
    +import org.apache.storm.task.WorkerTopologyContext;
    +import org.apache.storm.tuple.AddressedTuple;
    +import org.apache.storm.tuple.TupleImpl;
    +import org.apache.storm.tuple.Values;
    +import org.apache.storm.utils.DisruptorQueue;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.*;
    +import java.util.concurrent.Callable;
    +
    +public abstract class BaseExecutor implements Callable, EventHandler {
    +
    +    private static final Logger LOG = 
LoggerFactory.getLogger(BaseExecutor.class);
    +
    +    protected final ExecutorData executorData;
    +    protected final Map stormConf;
    +    protected final String componentId;
    +    protected final WorkerTopologyContext workerTopologyContext;
    +    protected final IReportError reportError;
    +    protected final Callable<Boolean> sampler;
    +    protected final Random rand;
    +    protected final DisruptorQueue transferQueue;
    +    protected final DisruptorQueue receiveQueue;
    +    protected final Map<Integer, Task> idToTask;
    +    protected final Map<String, String> credentials;
    +    protected final Boolean isDebug;
    +    protected final Boolean isEventLoggers;
    +    protected String hostname;
    +
    +    public BaseExecutor(ExecutorData executorData, Map<Integer, Task> 
idToTask, Map<String, String> credentials) {
    +        this.executorData = executorData;
    +        this.stormConf = executorData.getStormConf();
    +        this.componentId = executorData.getComponentId();
    +        this.workerTopologyContext = 
executorData.getWorkerTopologyContext();
    +        this.reportError = executorData.getReportError();
    +        this.sampler = executorData.getSampler();
    +        this.rand = new Random(Utils.secureRandomLong());
    +        this.transferQueue = executorData.getBatchTransferWorkerQueue();
    +        this.receiveQueue = executorData.getReceiveQueue();
    +        this.idToTask = idToTask;
    +        this.credentials = credentials;
    +        this.isDebug = 
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false);
    +        this.isEventLoggers = StormCommon.hasEventLoggers(stormConf);
    +
    +        try {
    +            this.hostname = Utils.hostname(stormConf);
    +        } catch (UnknownHostException ignored) {
    +            this.hostname = "";
    +        }
    +    }
    +
    +    @SuppressWarnings("unchecked")
    +    @Override
    +    public void onEvent(Object event, long seq, boolean endOfBatch) throws 
Exception {
    +        ArrayList<AddressedTuple> addressedTuples = 
(ArrayList<AddressedTuple>) event;
    +        for (AddressedTuple addressedTuple : addressedTuples) {
    +            TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
    +            int taskId = addressedTuple.getDest();
    +            if (isDebug) {
    +                LOG.info("Processing received message FOR {} TUPLE: {}", 
taskId, tuple);
    +            }
    +            if (taskId != AddressedTuple.BROADCAST_DEST) {
    +                tupleActionFn(taskId, tuple);
    +            } else {
    +                for (Integer t : executorData.getTaskIds()) {
    +                    tupleActionFn(t, tuple);
    +                }
    +            }
    +        }
    +    }
    +
    +    public void metricsTick(Task taskData, TupleImpl tuple) {
    +        try {
    +            Integer interval = tuple.getInteger(0);
    +            int taskId = taskData.getTaskId();
    +            Map<Integer, Map<String, IMetric>> taskToMetricToRegistry = 
executorData.getIntervalToTaskToMetricToRegistry().get(interval);
    +            Map<String, IMetric> nameToRegistry = null;
    +            if (taskToMetricToRegistry != null) {
    --- End diff --
    
    this null check is not present in clojure code. 


> port backtype.storm.daemon.executor to java
> -------------------------------------------
>
>                 Key: STORM-1277
>                 URL: https://issues.apache.org/jira/browse/STORM-1277
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Cody
>              Labels: java-migration, jstorm-merger
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/task
>  kind of.  Tasks and executors are combined in jstorm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to