[
https://issues.apache.org/jira/browse/STORM-1277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15401651#comment-15401651
]
ASF GitHub Bot commented on STORM-1277:
---------------------------------------
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/1445#discussion_r72934070
--- Diff:
storm-core/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java ---
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.bolt;
+
+import clojure.lang.Atom;
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class BoltExecutor extends Executor {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BoltExecutor.class);
+
+ private final Callable<Boolean> executeSampler;
+
+ public BoltExecutor(Map workerData, List<Long> executorId, Map<String,
String> credentials) {
+ super(workerData, executorId, credentials);
+ this.executeSampler = ConfigUtils.mkStatsSampler(stormConf);
+ }
+
+ @Override
+ public void init(Map<Integer, Task> idToTask) {
+ this.idToTask = idToTask;
+ LOG.info("Preparing bolt {}:{}", componentId, idToTask.keySet());
+ for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
+ Task taskData = entry.getValue();
+ IBolt boltObject = (IBolt) taskData.getTaskObject();
+ TopologyContext userContext = taskData.getUserContext();
+ taskData.getBuiltInMetrics().registerAll(stormConf,
userContext);
+ if (boltObject instanceof ICredentialsListener) {
+ ((ICredentialsListener)
boltObject).setCredentials(credentials);
+ }
+ if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) {
+ Map<String, DisruptorQueue> map =
ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue,
+ "transfer", (DisruptorQueue)
workerData.get("transfer-queue"));
+ BuiltinMetricsUtil.registerQueueMetrics(map, stormConf,
userContext);
+
+ Map cachedNodePortToSocket = (Map) ((Atom)
workerData.get("cached-node+port->socket")).deref();
+
BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket,
stormConf, userContext);
+
BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.get("receiver"),
stormConf, userContext);
+ } else {
+ Map<String, DisruptorQueue> map =
ImmutableMap.of("sendqueue", transferQueue, "receive", receiveQueue);
+ BuiltinMetricsUtil.registerQueueMetrics(map, stormConf,
userContext);
+ }
+
+ IOutputCollector outputCollector = new
BoltOutputCollectorImpl(this, taskData, entry.getKey(), rand, isEventLoggers,
isDebug);
+ boltObject.prepare(stormConf, userContext, new
OutputCollector(outputCollector));
+ }
+ openOrPrepareWasCalled.set(true);
+ LOG.info("Prepared bolt {}:{}", componentId, idToTask.keySet());
+ setupMetrics();
+ }
+
+ @Override
+ public Object call() throws Exception {
+ while (!stormActive.get()) {
--- End diff --
Same. This should block init() only once instead of each loop.
> port backtype.storm.daemon.executor to java
> -------------------------------------------
>
> Key: STORM-1277
> URL: https://issues.apache.org/jira/browse/STORM-1277
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Cody
> Labels: java-migration, jstorm-merger
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/task
> kind of. Tasks and executors are combined in jstorm.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)