[ https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324397#comment-15324397 ]
ASF GitHub Bot commented on STORM-1277: --------------------------------------- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r66607521 --- Diff: storm-core/src/jvm/org/apache/storm/executor/BaseExecutor.java --- @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor; + +import com.google.common.collect.Lists; +import com.lmax.disruptor.EventHandler; +import java.net.UnknownHostException; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.StormTimer; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.executor.error.IReportError; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.AddressedTuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.Callable; + +public abstract class BaseExecutor implements Callable, EventHandler { + + private static final Logger LOG = LoggerFactory.getLogger(BaseExecutor.class); + + protected final ExecutorData executorData; + protected final Map stormConf; + protected final String componentId; + protected final WorkerTopologyContext workerTopologyContext; + protected final IReportError reportError; + protected final Callable<Boolean> sampler; + protected final Random rand; + protected final DisruptorQueue transferQueue; + protected final DisruptorQueue receiveQueue; + protected final Map<Integer, Task> idToTask; + protected final Map<String, String> credentials; + protected final Boolean isDebug; + protected final Boolean isEventLoggers; + protected String hostname; + + public BaseExecutor(ExecutorData executorData, Map<Integer, Task> idToTask, Map<String, String> credentials) { + this.executorData = executorData; + this.stormConf = executorData.getStormConf(); + this.componentId = executorData.getComponentId(); + this.workerTopologyContext = executorData.getWorkerTopologyContext(); + this.reportError = executorData.getReportError(); + this.sampler = executorData.getSampler(); + this.rand = new Random(Utils.secureRandomLong()); + this.transferQueue = executorData.getBatchTransferWorkerQueue(); + this.receiveQueue = executorData.getReceiveQueue(); + this.idToTask = idToTask; + this.credentials = credentials; + this.isDebug = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false); + this.isEventLoggers = StormCommon.hasEventLoggers(stormConf); + + try { + this.hostname = Utils.hostname(stormConf); + } catch (UnknownHostException ignored) { + this.hostname = ""; + } + } + + @SuppressWarnings("unchecked") + @Override + public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception { + ArrayList<AddressedTuple> addressedTuples = (ArrayList<AddressedTuple>) event; + for (AddressedTuple addressedTuple : addressedTuples) { + TupleImpl tuple = (TupleImpl) addressedTuple.getTuple(); + int taskId = addressedTuple.getDest(); + if (isDebug) { + LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple); + } + if (taskId != AddressedTuple.BROADCAST_DEST) { + tupleActionFn(taskId, tuple); + } else { + for (Integer t : executorData.getTaskIds()) { + tupleActionFn(t, tuple); + } + } + } + } + + public void metricsTick(Task taskData, TupleImpl tuple) { + try { + Integer interval = tuple.getInteger(0); + int taskId = taskData.getTaskId(); + Map<Integer, Map<String, IMetric>> taskToMetricToRegistry = executorData.getIntervalToTaskToMetricToRegistry().get(interval); + Map<String, IMetric> nameToRegistry = null; + if (taskToMetricToRegistry != null) { --- End diff -- this null check is not present in clojure code. > port backtype.storm.daemon.executor to java > ------------------------------------------- > > Key: STORM-1277 > URL: https://issues.apache.org/jira/browse/STORM-1277 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core > Reporter: Robert Joseph Evans > Assignee: Cody > Labels: java-migration, jstorm-merger > > https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/task > kind of. Tasks and executors are combined in jstorm. -- This message was sent by Atlassian JIRA (v6.3.4#6332)