http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java index 1d07497..0901cc3 100644 --- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java +++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java @@ -24,10 +24,10 @@ import org.apache.storm.messaging.IConnectionCallback; import org.apache.storm.messaging.TaskMessage; import org.apache.storm.metric.api.IMetric; import org.apache.storm.metric.api.IStatefulObject; +import org.apache.storm.serialization.KryoValuesDeserializer; import org.apache.storm.serialization.KryoValuesSerializer; import org.apache.storm.utils.ObjectReader; -import java.io.IOException; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collection; @@ -39,6 +39,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; @@ -62,9 +64,10 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe final ServerBootstrap bootstrap; private volatile boolean closing = false; - List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null)); - private KryoValuesSerializer _ser; - private IConnectionCallback _cb = null; + KryoValuesSerializer _ser; + KryoValuesDeserializer deser; + private IConnectionCallback _cb = null; + private Supplier<Object> newConnectionResponse; private final int boundPort; @SuppressWarnings("rawtypes") @@ -72,6 +75,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe this.topoConf = topoConf; this.port = port; _ser = new KryoValuesSerializer(topoConf); + deser = new KryoValuesDeserializer(topoConf); // Configure the server. int buffer_size = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)); @@ -143,12 +147,9 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe _cb = cb; } - /** - * register a newly created channel - * @param channel newly created channel - */ - protected void addChannel(Channel channel) { - allChannels.add(channel); + @Override + public void registerNewConnectionResponse(Supplier<Object> newConnectionResponse) { + this.newConnectionResponse = newConnectionResponse; } /** @@ -176,14 +177,17 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe } @Override - public void sendLoadMetrics(Map<Integer, Double> taskToLoad) { - try { - MessageBatch mb = new MessageBatch(1); - mb.add(new TaskMessage(-1, _ser.serialize(Arrays.asList((Object)taskToLoad)))); - allChannels.write(mb); - } catch (IOException e) { - throw new RuntimeException(e); - } + synchronized public void sendLoadMetrics(Map<Integer, Double> taskToLoad) { + MessageBatch mb = new MessageBatch(1); + mb.add(new TaskMessage(-1, _ser.serialize(Arrays.asList((Object)taskToLoad)))); + allChannels.write(mb); + } + + // this method expected to be thread safe + @Override + synchronized public void sendBackPressureStatus(BackPressureStatus bpStatus) { + LOG.info("Sending BackPressure status update to connected workers. BPStatus = {}", bpStatus); + allChannels.write(bpStatus); } @Override @@ -264,7 +268,10 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe /** Implementing IServer. **/ public void channelConnected(Channel c) { - addChannel(c); + if (newConnectionResponse != null) { + c.write( newConnectionResponse.get() ); // not synchronized since it is not yet in channel grp, so pvt to this thread + } + allChannels.add(c); } public void received(Object message, String remote, Channel channel) throws InterruptedException {
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java index 3487003..a10fd02 100644 --- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java +++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java @@ -23,16 +23,14 @@ import org.apache.storm.serialization.KryoValuesDeserializer; import java.net.ConnectException; import java.util.Map; import java.util.List; -import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; -import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.channel.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,9 +38,11 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler { private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class); private Client client; private KryoValuesDeserializer _des; + private AtomicBoolean[] remoteBpStatus; - StormClientHandler(Client client, Map<String, Object> conf) { + StormClientHandler(Client client, AtomicBoolean[] remoteBpStatus, Map<String, Object> conf) { this.client = client; + this.remoteBpStatus = remoteBpStatus; _des = new KryoValuesDeserializer(conf); } @@ -55,20 +55,29 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler { if (msg==ControlMessage.FAILURE_RESPONSE) { LOG.info("failure response:{}", msg); } - } else if (message instanceof List) { - try { - //This should be the metrics, and there should only be one of them - List<TaskMessage> list = (List<TaskMessage>)message; - if (list.size() < 1) throw new RuntimeException("Didn't see enough load metrics ("+client.getDstAddress()+") "+list); - TaskMessage tm = ((List<TaskMessage>)message).get(list.size() - 1); - if (tm.task() != -1) throw new RuntimeException("Metrics messages are sent to the system task ("+client.getDstAddress()+") "+tm); - List metrics = _des.deserialize(tm.message()); - if (metrics.size() < 1) throw new RuntimeException("No metrics data in the metrics message ("+client.getDstAddress()+") "+metrics); - if (!(metrics.get(0) instanceof Map)) throw new RuntimeException("The metrics did not have a map in the first slot ("+client.getDstAddress()+") "+metrics); - client.setLoadMetrics((Map<Integer, Double>)metrics.get(0)); - } catch (IOException e) { - throw new RuntimeException(e); + } else if (message instanceof BackPressureStatus) { + BackPressureStatus status = (BackPressureStatus) message; + if (status.bpTasks != null) { + for (Integer bpTask : status.bpTasks) { + remoteBpStatus[bpTask].set(true); + } + } + if (status.nonBpTasks != null) { + for (Integer bpTask : status.nonBpTasks) { + remoteBpStatus[bpTask].set(false); + } } + LOG.debug("Received BackPressure status update : {}", status); + } else if (message instanceof List) { + //This should be the metrics, and there should only be one of them + List<TaskMessage> list = (List<TaskMessage>)message; + if (list.size() < 1) throw new RuntimeException("Didn't see enough load metrics ("+client.getDstAddress()+") "+list); + TaskMessage tm = ((List<TaskMessage>)message).get(list.size() - 1); + if (tm.task() != -1) throw new RuntimeException("Metrics messages are sent to the system task ("+client.getDstAddress()+") "+tm); + List metrics = _des.deserialize(tm.message()); + if (metrics.size() < 1) throw new RuntimeException("No metrics data in the metrics message ("+client.getDstAddress()+") "+metrics); + if (!(metrics.get(0) instanceof Map)) throw new RuntimeException("The metrics did not have a map in the first slot ("+client.getDstAddress()+") "+metrics); + client.setLoadMetrics((Map<Integer, Double>)metrics.get(0)); } else { throw new RuntimeException("Don't know how to handle a message of type " + message + " (" + client.getDstAddress() + ")"); http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java index 3fd7641..a16f3f5 100644 --- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java +++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java @@ -23,13 +23,16 @@ import org.jboss.netty.channel.Channels; import org.apache.storm.Config; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; class StormClientPipelineFactory implements ChannelPipelineFactory { private Client client; + private AtomicBoolean[] remoteBpStatus; private Map<String, Object> conf; - StormClientPipelineFactory(Client client, Map<String, Object> conf) { + StormClientPipelineFactory(Client client, AtomicBoolean[] remoteBpStatus, Map<String, Object> conf) { this.client = client; + this.remoteBpStatus = remoteBpStatus; this.conf = conf; } @@ -38,9 +41,9 @@ class StormClientPipelineFactory implements ChannelPipelineFactory { ChannelPipeline pipeline = Channels.pipeline(); // Decoder - pipeline.addLast("decoder", new MessageDecoder()); + pipeline.addLast("decoder", new MessageDecoder(client.deser)); // Encoder - pipeline.addLast("encoder", new MessageEncoder()); + pipeline.addLast("encoder", new MessageEncoder(client.ser)); boolean isNettyAuth = (Boolean) conf .get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION); @@ -50,7 +53,7 @@ class StormClientPipelineFactory implements ChannelPipelineFactory { client)); } // business logic. - pipeline.addLast("handler", new StormClientHandler(client, conf)); + pipeline.addLast("handler", new StormClientHandler(client, remoteBpStatus, conf)); return pipeline; } } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java index 2e4f418..b67e264 100644 --- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java +++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java @@ -17,6 +17,8 @@ */ package org.apache.storm.messaging.netty; +import org.apache.storm.serialization.KryoValuesDeserializer; +import org.apache.storm.serialization.KryoValuesSerializer; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; @@ -35,9 +37,9 @@ class StormServerPipelineFactory implements ChannelPipelineFactory { ChannelPipeline pipeline = Channels.pipeline(); // Decoder - pipeline.addLast("decoder", new MessageDecoder()); + pipeline.addLast("decoder", new MessageDecoder(server.deser)); // Encoder - pipeline.addLast("encoder", new MessageEncoder()); + pipeline.addLast("encoder", new MessageEncoder(server._ser)); boolean isNettyAuth = (Boolean) this.server.topoConf .get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION); http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java b/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java new file mode 100644 index 0000000..f41492e --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/policy/IWaitStrategy.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.policy; + +import org.apache.storm.Config; +import org.apache.storm.utils.ReflectionUtils; + +import java.util.Map; + + +public interface IWaitStrategy { + enum WAIT_SITUATION {BOLT_WAIT, BACK_PRESSURE_WAIT} + + void prepare(Map<String, Object> conf, WAIT_SITUATION waitSituation); + + /** + * Implementations of this method should be thread-safe (preferably no side-effects and lock-free) + * <p> + * Supports static or dynamic backoff. Dynamic backoff relies on idleCounter to + * estimate how long caller has been idling. + * <p> + * <pre> + * <code> + * int idleCounter = 0; + * int consumeCount = consumeFromQ(); + * while (consumeCount==0) { + * idleCounter = strategy.idle(idleCounter); + * consumeCount = consumeFromQ(); + * } + * </code> + * </pre> + * + * @param idleCounter managed by the idle method until reset + * @return new counter value to be used on subsequent idle cycle + */ + int idle(int idleCounter) throws InterruptedException; + + static IWaitStrategy createBackPressureWaitStrategy(Map<String, Object> topologyConf) { + IWaitStrategy producerWaitStrategy = ReflectionUtils.newInstance((String) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY)); + producerWaitStrategy.prepare(topologyConf, WAIT_SITUATION.BACK_PRESSURE_WAIT); + return producerWaitStrategy; + } + + +} http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java b/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java new file mode 100644 index 0000000..0406fd2 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyPark.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.policy; + +import org.apache.storm.Config; +import org.apache.storm.utils.ObjectReader; + +import java.util.Map; +import java.util.concurrent.locks.LockSupport; + +public class WaitStrategyPark implements IWaitStrategy { + private long parkTimeNanoSec; + + @Override + public void prepare(Map<String, Object> conf, WAIT_SITUATION waitSituation) { + if (waitSituation == WAIT_SITUATION.BOLT_WAIT) { + parkTimeNanoSec = 1_000 * ObjectReader.getLong(conf.get(Config.TOPOLOGY_BOLT_WAIT_PARK_MICROSEC)); + } else if (waitSituation == WAIT_SITUATION.BACK_PRESSURE_WAIT) { + parkTimeNanoSec = 1_000 * ObjectReader.getLong(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PARK_MICROSEC)); + } else { + throw new IllegalArgumentException("Unknown wait situation : " + waitSituation); + } + } + + public WaitStrategyPark() { // required for instantiation via reflection. must call prepare() thereafter + } + + // Convenience alternative to prepare() for use in Tests + public WaitStrategyPark(long microsec) { + parkTimeNanoSec = microsec * 1_000; + } + + + @Override + public int idle(int idleCounter) throws InterruptedException { + if (parkTimeNanoSec == 0) { + return 1; + } + LockSupport.parkNanos(parkTimeNanoSec); + return idleCounter + 1; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java b/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java new file mode 100644 index 0000000..067ca71 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/policy/WaitStrategyProgressive.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.storm.policy; + +import org.apache.storm.Config; +import org.apache.storm.utils.ObjectReader; + +import java.util.Map; +import java.util.concurrent.locks.LockSupport; + +/** + * A Progressive Wait Strategy + * <p> Has three levels of idling. Stays in each level for a configured number of iterations before entering the next level. + * Level 1 - No idling. Returns immediately. Stays in this level for `level1Count` iterations. + * Level 2 - Calls LockSupport.parkNanos(1). Stays in this level for `level2Count` iterations + * Level 3 - Calls Thread.sleep(). Stays in this level until wait situation changes. + * + * <p> + * The initial spin can be useful to prevent downstream bolt from repeatedly sleeping/parking when + * the upstream component is a bit relatively slower. Allows downstream bolt can enter deeper wait states only + * if the traffic to it appears to have reduced. + * <p> + */ +public class WaitStrategyProgressive implements IWaitStrategy { + private int level1Count; + private int level2Count; + private long level3SleepMs; + + @Override + public void prepare(Map<String, Object> conf, WAIT_SITUATION waitSituation) { + if (waitSituation == WAIT_SITUATION.BOLT_WAIT) { + level1Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL1_COUNT)); + level2Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL2_COUNT)); + level3SleepMs = ObjectReader.getLong(conf.get(Config.TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS)); + } else if (waitSituation == WAIT_SITUATION.BACK_PRESSURE_WAIT) { + level1Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL1_COUNT)); + level2Count = ObjectReader.getInt(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL2_COUNT)); + level3SleepMs = ObjectReader.getLong(conf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS)); + } else { + throw new IllegalArgumentException("Unknown wait situation : " + waitSituation); + } + } + + @Override + public int idle(int idleCounter) throws InterruptedException { + if (idleCounter < level1Count) { // level 1 - no waiting + ++idleCounter; + } else if (idleCounter < level1Count * level2Count) { // level 2 - parkNanos(1L) + ++idleCounter; + LockSupport.parkNanos(1L); + } else { // level 3 - longer idling with Thread.sleep() + Thread.sleep(level3SleepMs); + } + return idleCounter; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java b/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java index f2d8bf1..1be7558 100644 --- a/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java +++ b/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java @@ -19,7 +19,6 @@ package org.apache.storm.serialization; import org.apache.storm.task.GeneralTopologyContext; import org.apache.storm.tuple.MessageId; -import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.TupleImpl; import com.esotericsoftware.kryo.io.Input; import java.io.IOException; @@ -39,7 +38,8 @@ public class KryoTupleDeserializer implements ITupleDeserializer { _kryoInput = new Input(1); } - public Tuple deserialize(byte[] ser) { + @Override + public TupleImpl deserialize(byte[] ser) { try { _kryoInput.setBuffer(ser); int taskId = _kryoInput.readInt(true); @@ -48,7 +48,7 @@ public class KryoTupleDeserializer implements ITupleDeserializer { String streamName = _ids.getStreamName(componentName, streamId); MessageId id = MessageId.deserialize(_kryoInput); List<Object> values = _kryo.deserializeFrom(_kryoInput); - return new TupleImpl(_context, values, taskId, streamName, id); + return new TupleImpl(_context, values, componentName, taskId, streamName, id); } catch(IOException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesDeserializer.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesDeserializer.java b/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesDeserializer.java index 2ef326a..2d83584 100644 --- a/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesDeserializer.java +++ b/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesDeserializer.java @@ -38,12 +38,12 @@ public class KryoValuesDeserializer { return delegate.getDelegate(); } - public List<Object> deserialize(byte[] ser) throws IOException { + public List<Object> deserialize(byte[] ser) { _kryoInput.setBuffer(ser); return deserializeFrom(_kryoInput); } - public Object deserializeObject(byte[] ser) throws IOException { + public Object deserializeObject(byte[] ser) { _kryoInput.setBuffer(ser); return _kryo.readClassAndObject(_kryoInput); } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesSerializer.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesSerializer.java b/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesSerializer.java index af41e15..8c5c5c1 100644 --- a/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesSerializer.java +++ b/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesSerializer.java @@ -20,7 +20,6 @@ package org.apache.storm.serialization; import org.apache.storm.utils.ListDelegate; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Output; -import java.io.IOException; import java.util.List; import java.util.Map; @@ -35,7 +34,7 @@ public class KryoValuesSerializer { _kryoOut = new Output(2000, 2000000000); } - public void serializeInto(List<Object> values, Output out) throws IOException { + public void serializeInto(List<Object> values, Output out) { // this ensures that list of values is always written the same way, regardless // of whether it's a java collection or one of clojure's persistent collections // (which have different serializers) @@ -44,7 +43,7 @@ public class KryoValuesSerializer { _kryo.writeObject(out, _delegate); } - public byte[] serialize(List<Object> values) throws IOException { + public byte[] serialize(List<Object> values) { _kryoOut.clear(); serializeInto(values, _kryoOut); return _kryoOut.toBytes(); http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java b/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java index d6c54a3..8f69b17 100644 --- a/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java +++ b/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java @@ -20,6 +20,7 @@ package org.apache.storm.serialization; import org.apache.storm.Config; import org.apache.storm.generated.ComponentCommon; import org.apache.storm.generated.StormTopology; +import org.apache.storm.messaging.netty.BackPressureStatus; import org.apache.storm.serialization.types.ArrayListSerializer; import org.apache.storm.serialization.types.HashMapSerializer; import org.apache.storm.serialization.types.HashSetSerializer; @@ -72,6 +73,7 @@ public class SerializationFactory { k.register(org.apache.storm.metric.api.IMetricsConsumer.DataPoint.class); k.register(org.apache.storm.metric.api.IMetricsConsumer.TaskInfo.class); k.register(ConsList.class); + k.register(BackPressureStatus.class); synchronized (loader) { for (SerializationRegister sr: loader) { http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/spout/ISpoutOutputCollector.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/spout/ISpoutOutputCollector.java b/storm-client/src/jvm/org/apache/storm/spout/ISpoutOutputCollector.java index 4624cf7..f8a5a6c 100644 --- a/storm-client/src/jvm/org/apache/storm/spout/ISpoutOutputCollector.java +++ b/storm-client/src/jvm/org/apache/storm/spout/ISpoutOutputCollector.java @@ -21,6 +21,11 @@ import org.apache.storm.task.IErrorReporter; import java.util.List; +/** + * Methods are not expected to be thread safe. Each thread expected to have a separate instance of this type of object, or else + * externally synchronize any shared instance. + */ + public interface ISpoutOutputCollector extends IErrorReporter{ /** Returns the task ids that received the tuples. @@ -28,5 +33,6 @@ public interface ISpoutOutputCollector extends IErrorReporter{ List<Integer> emit(String streamId, List<Object> tuple, Object messageId); void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId); long getPendingCount(); + void flush(); } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/spout/SpoutOutputCollector.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/spout/SpoutOutputCollector.java b/storm-client/src/jvm/org/apache/storm/spout/SpoutOutputCollector.java index 8a7beb6..7666d1c 100644 --- a/storm-client/src/jvm/org/apache/storm/spout/SpoutOutputCollector.java +++ b/storm-client/src/jvm/org/apache/storm/spout/SpoutOutputCollector.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -63,7 +63,7 @@ public class SpoutOutputCollector implements ISpoutOutputCollector { * @return the list of task ids that this tuple was sent to */ public List<Integer> emit(List<Object> tuple, Object messageId) { - return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId); + return _delegate.emit(Utils.DEFAULT_STREAM_ID, tuple, messageId); } /** @@ -133,6 +133,11 @@ public class SpoutOutputCollector implements ISpoutOutputCollector { } @Override + public void flush() { + _delegate.flush(); + } + + @Override public void reportError(Throwable error) { _delegate.reportError(error); } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/state/DefaultStateSerializer.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/state/DefaultStateSerializer.java b/storm-client/src/jvm/org/apache/storm/state/DefaultStateSerializer.java index bb61921..5d6ef17 100644 --- a/storm-client/src/jvm/org/apache/storm/state/DefaultStateSerializer.java +++ b/storm-client/src/jvm/org/apache/storm/state/DefaultStateSerializer.java @@ -125,7 +125,7 @@ public class DefaultStateSerializer<T> implements Serializer<T> { public TupleImpl read(Kryo kryo, Input input, Class<TupleImpl> type) { int length = input.readInt(); byte[] bytes = input.readBytes(length); - return (TupleImpl) tupleDeserializer.deserialize(bytes); + return tupleDeserializer.deserialize(bytes); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java index 1bacab2..78246bb 100644 --- a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java +++ b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java @@ -30,40 +30,49 @@ import java.util.List; @SuppressWarnings("unchecked") public class BoltExecutorStats extends CommonStats { - public static final String ACKED = "acked"; - public static final String FAILED = "failed"; - public static final String EXECUTED = "executed"; - public static final String PROCESS_LATENCIES = "process-latencies"; - public static final String EXECUTE_LATENCIES = "execute-latencies"; + MultiCountStatAndMetric ackedStats; + MultiCountStatAndMetric failedStats; + MultiCountStatAndMetric executedStats; + MultiLatencyStatAndMetric processLatencyStats; + MultiLatencyStatAndMetric executeLatencyStats; public BoltExecutorStats(int rate,int numStatBuckets) { super(rate,numStatBuckets); - - this.put(ACKED, new MultiCountStatAndMetric(numStatBuckets)); - this.put(FAILED, new MultiCountStatAndMetric(numStatBuckets)); - this.put(EXECUTED, new MultiCountStatAndMetric(numStatBuckets)); - this.put(PROCESS_LATENCIES, new MultiLatencyStatAndMetric(numStatBuckets)); - this.put(EXECUTE_LATENCIES, new MultiLatencyStatAndMetric(numStatBuckets)); + this.ackedStats = new MultiCountStatAndMetric(numStatBuckets); + this.failedStats = new MultiCountStatAndMetric(numStatBuckets); + this.executedStats = new MultiCountStatAndMetric(numStatBuckets); + this.processLatencyStats = new MultiLatencyStatAndMetric(numStatBuckets); + this.executeLatencyStats = new MultiLatencyStatAndMetric(numStatBuckets); } public MultiCountStatAndMetric getAcked() { - return (MultiCountStatAndMetric) this.get(ACKED); + return ackedStats; } public MultiCountStatAndMetric getFailed() { - return (MultiCountStatAndMetric) this.get(FAILED); + return failedStats; } public MultiCountStatAndMetric getExecuted() { - return (MultiCountStatAndMetric) this.get(EXECUTED); + return executedStats; } public MultiLatencyStatAndMetric getProcessLatencies() { - return (MultiLatencyStatAndMetric) this.get(PROCESS_LATENCIES); + return processLatencyStats; } public MultiLatencyStatAndMetric getExecuteLatencies() { - return (MultiLatencyStatAndMetric) this.get(EXECUTE_LATENCIES); + return executeLatencyStats; + } + + @Override + public void cleanupStats() { + ackedStats.close(); + failedStats.close(); + executedStats.close(); + processLatencyStats.close(); + executeLatencyStats.close(); + super.cleanupStats(); } public void boltExecuteTuple(String component, String stream, long latencyMs) { @@ -88,17 +97,17 @@ public class BoltExecutorStats extends CommonStats { public ExecutorStats renderStats() { ExecutorStats ret = new ExecutorStats(); // common stats - ret.set_emitted(valueStat(EMITTED)); - ret.set_transferred(valueStat(TRANSFERRED)); + ret.set_emitted(valueStat(getEmitted())); + ret.set_transferred(valueStat(getTransferred())); ret.set_rate(this.rate); // bolt stats BoltStats boltStats = new BoltStats( - StatsUtil.windowSetConverter(valueStat(ACKED), StatsUtil.TO_GSID, StatsUtil.IDENTITY), - StatsUtil.windowSetConverter(valueStat(FAILED), StatsUtil.TO_GSID, StatsUtil.IDENTITY), - StatsUtil.windowSetConverter(valueStat(PROCESS_LATENCIES), StatsUtil.TO_GSID, StatsUtil.IDENTITY), - StatsUtil.windowSetConverter(valueStat(EXECUTED), StatsUtil.TO_GSID, StatsUtil.IDENTITY), - StatsUtil.windowSetConverter(valueStat(EXECUTE_LATENCIES), StatsUtil.TO_GSID, StatsUtil.IDENTITY)); + StatsUtil.windowSetConverter(valueStat(ackedStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY), + StatsUtil.windowSetConverter(valueStat(failedStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY), + StatsUtil.windowSetConverter(valueStat(processLatencyStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY), + StatsUtil.windowSetConverter(valueStat(executedStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY), + StatsUtil.windowSetConverter(valueStat(executeLatencyStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY)); ret.set_specific(ExecutorSpecificStats.bolt(boltStats)); return ret; http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java b/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java index a66a92c..f115d12 100644 --- a/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java +++ b/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java @@ -17,29 +17,23 @@ */ package org.apache.storm.stats; -import java.util.HashMap; import java.util.Map; import org.apache.storm.generated.ExecutorStats; -import org.apache.storm.metric.api.IMetric; import org.apache.storm.metric.internal.MultiCountStatAndMetric; import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; @SuppressWarnings("unchecked") public abstract class CommonStats { - public static final String RATE = "rate"; - - public static final String EMITTED = "emitted"; - public static final String TRANSFERRED = "transferred"; - public static final String[] COMMON_FIELDS = {EMITTED, TRANSFERRED}; + private final MultiCountStatAndMetric emittedStats; + private final MultiCountStatAndMetric transferredStats; protected final int rate; - protected final Map<String, IMetric> metricMap = new HashMap<>(); public CommonStats(int rate,int numStatBuckets) { this.rate = rate; - this.put(EMITTED, new MultiCountStatAndMetric(numStatBuckets)); - this.put(TRANSFERRED, new MultiCountStatAndMetric(numStatBuckets)); + this.emittedStats = new MultiCountStatAndMetric(numStatBuckets); + this.transferredStats = new MultiCountStatAndMetric(numStatBuckets); } public int getRate() { @@ -47,19 +41,11 @@ public abstract class CommonStats { } public MultiCountStatAndMetric getEmitted() { - return (MultiCountStatAndMetric) get(EMITTED); + return emittedStats; } public MultiCountStatAndMetric getTransferred() { - return (MultiCountStatAndMetric) get(TRANSFERRED); - } - - public IMetric get(String field) { - return (IMetric) StatsUtil.getByKey(metricMap, field); - } - - protected void put(String field, Object value) { - StatsUtil.putKV(metricMap, field, value); + return transferredStats; } public void emittedTuple(String stream) { @@ -71,42 +57,16 @@ public abstract class CommonStats { } public void cleanupStats() { - for (Object imetric : this.metricMap.values()) { - cleanupStat((IMetric) imetric); - } - } - - private void cleanupStat(IMetric metric) { - if (metric instanceof MultiCountStatAndMetric) { - ((MultiCountStatAndMetric) metric).close(); - } else if (metric instanceof MultiLatencyStatAndMetric) { - ((MultiLatencyStatAndMetric) metric).close(); - } + emittedStats.close(); + transferredStats.close(); } - protected Map valueStats(String[] fields) { - Map ret = new HashMap(); - for (String field : fields) { - IMetric metric = this.get(field); - if (metric instanceof MultiCountStatAndMetric) { - StatsUtil.putKV(ret, field, ((MultiCountStatAndMetric) metric).getTimeCounts()); - } else if (metric instanceof MultiLatencyStatAndMetric) { - StatsUtil.putKV(ret, field, ((MultiLatencyStatAndMetric) metric).getTimeLatAvg()); - } - } - StatsUtil.putKV(ret, CommonStats.RATE, this.getRate()); - - return ret; + protected Map valueStat(MultiCountStatAndMetric metric) { + return metric.getTimeCounts(); } - protected Map valueStat(String field) { - IMetric metric = this.get(field); - if (metric instanceof MultiCountStatAndMetric) { - return ((MultiCountStatAndMetric) metric).getTimeCounts(); - } else if (metric instanceof MultiLatencyStatAndMetric) { - return ((MultiLatencyStatAndMetric) metric).getTimeLatAvg(); - } - return null; + protected Map valueStat(MultiLatencyStatAndMetric metric) { + return metric.getTimeLatAvg(); } public abstract ExecutorStats renderStats(); http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java b/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java index 0addbdc..6c3d589 100644 --- a/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java +++ b/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java @@ -26,27 +26,35 @@ import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; @SuppressWarnings("unchecked") public class SpoutExecutorStats extends CommonStats { - public static final String ACKED = "acked"; - public static final String FAILED = "failed"; - public static final String COMPLETE_LATENCIES = "complete-latencies"; + private final MultiCountStatAndMetric ackedStats; + private final MultiCountStatAndMetric failedStats; + private final MultiLatencyStatAndMetric completeLatencyStats; public SpoutExecutorStats(int rate,int numStatBuckets) { super(rate,numStatBuckets); - this.put(ACKED, new MultiCountStatAndMetric(numStatBuckets)); - this.put(FAILED, new MultiCountStatAndMetric(numStatBuckets)); - this.put(COMPLETE_LATENCIES, new MultiLatencyStatAndMetric(numStatBuckets)); + this.ackedStats = new MultiCountStatAndMetric(numStatBuckets); + this.failedStats = new MultiCountStatAndMetric(numStatBuckets); + this.completeLatencyStats = new MultiLatencyStatAndMetric(numStatBuckets); } public MultiCountStatAndMetric getAcked() { - return (MultiCountStatAndMetric) this.get(ACKED); + return ackedStats; } public MultiCountStatAndMetric getFailed() { - return (MultiCountStatAndMetric) this.get(FAILED); + return failedStats; } public MultiLatencyStatAndMetric getCompleteLatencies() { - return (MultiLatencyStatAndMetric) this.get(COMPLETE_LATENCIES); + return completeLatencyStats; + } + + @Override + public void cleanupStats() { + ackedStats.close(); + failedStats.close(); + completeLatencyStats.close(); + super.cleanupStats(); } public void spoutAckedTuple(String stream, long latencyMs) { @@ -62,13 +70,13 @@ public class SpoutExecutorStats extends CommonStats { public ExecutorStats renderStats() { ExecutorStats ret = new ExecutorStats(); // common fields - ret.set_emitted(valueStat(EMITTED)); - ret.set_transferred(valueStat(TRANSFERRED)); + ret.set_emitted(valueStat(getEmitted())); + ret.set_transferred(valueStat(getTransferred())); ret.set_rate(this.rate); // spout stats SpoutStats spoutStats = new SpoutStats( - valueStat(ACKED), valueStat(FAILED), valueStat(COMPLETE_LATENCIES)); + valueStat(ackedStats), valueStat(failedStats), valueStat(completeLatencyStats)); ret.set_specific(ExecutorSpecificStats.spout(spoutStats)); return ret; http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/task/GeneralTopologyContext.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/task/GeneralTopologyContext.java b/storm-client/src/jvm/org/apache/storm/task/GeneralTopologyContext.java index deae7cf..d59c9cf 100644 --- a/storm-client/src/jvm/org/apache/storm/task/GeneralTopologyContext.java +++ b/storm-client/src/jvm/org/apache/storm/task/GeneralTopologyContext.java @@ -24,6 +24,7 @@ import org.apache.storm.generated.GlobalStreamId; import org.apache.storm.generated.Grouping; import org.apache.storm.generated.StormTopology; import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.ThriftTopologyUtils; import java.util.ArrayList; @@ -41,8 +42,9 @@ public class GeneralTopologyContext implements JSONAware { private Map<String, List<Integer>> _componentToTasks; private Map<String, Map<String, Fields>> _componentToStreamToFields; private String _stormId; - protected Map _topoConf; - + protected Map<String, Object> _topoConf; + protected boolean _doSanityCheck; + // pass in componentToSortedTasks for the case of running tons of tasks in single executor public GeneralTopologyContext(StormTopology topology, Map<String, Object> topoConf, Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks, @@ -53,6 +55,7 @@ public class GeneralTopologyContext implements JSONAware { _stormId = stormId; _componentToTasks = componentToSortedTasks; _componentToStreamToFields = componentToStreamToFields; + _doSanityCheck = ConfigUtils.isLocalMode(_topoConf); } /** @@ -203,4 +206,8 @@ public class GeneralTopologyContext implements JSONAware { public Map<String, Object> getConf() { return _topoConf; } + + public boolean doSanityCheck() { + return _doSanityCheck; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java b/storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java index cda4d9f..f740060 100644 --- a/storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java +++ b/storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java @@ -18,6 +18,7 @@ package org.apache.storm.task; import org.apache.storm.tuple.Tuple; + import java.util.Collection; import java.util.List; @@ -30,4 +31,5 @@ public interface IOutputCollector extends IErrorReporter { void ack(Tuple input); void fail(Tuple input); void resetTimeout(Tuple input); + void flush(); } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/task/OutputCollector.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/task/OutputCollector.java b/storm-client/src/jvm/org/apache/storm/task/OutputCollector.java index 24f58ac..7ce9ee0 100644 --- a/storm-client/src/jvm/org/apache/storm/task/OutputCollector.java +++ b/storm-client/src/jvm/org/apache/storm/task/OutputCollector.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -233,4 +233,9 @@ public class OutputCollector implements IOutputCollector { public void reportError(Throwable error) { _delegate.reportError(error); } + + @Override + public void flush() { + _delegate.flush(); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java index 601d70f..0a1765b 100644 --- a/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java +++ b/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -32,7 +32,6 @@ import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -272,7 +271,7 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo _hooks.add(hook); } - public Collection<ITaskHook> getHooks() { + public List<ITaskHook> getHooks() { return _hooks; } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java b/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java index 33d62ec..5f75eae 100644 --- a/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java +++ b/storm-client/src/jvm/org/apache/storm/testing/SpoutTracker.java @@ -62,6 +62,11 @@ public class SpoutTracker extends BaseRichSpout { } @Override + public void flush() { + _collector.flush(); + } + + @Override public void reportError(Throwable error) { _collector.reportError(error); } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/operation/TridentCollector.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/trident/operation/TridentCollector.java b/storm-client/src/jvm/org/apache/storm/trident/operation/TridentCollector.java index 7392c9c..e8899ed 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/operation/TridentCollector.java +++ b/storm-client/src/jvm/org/apache/storm/trident/operation/TridentCollector.java @@ -42,6 +42,11 @@ public interface TridentCollector { void emit(List<Object> values); /** + * Flush any buffered tuples (when batching is enabled) + */ + void flush(); + + /** * Reports an error. The corresponding stack trace will be visible in the Storm UI. * * Note that calling this method does not alter the processing of a batch. To explicitly fail a batch and trigger http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/operation/impl/CaptureCollector.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/trident/operation/impl/CaptureCollector.java b/storm-client/src/jvm/org/apache/storm/trident/operation/impl/CaptureCollector.java index c187857..df4561a 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/operation/impl/CaptureCollector.java +++ b/storm-client/src/jvm/org/apache/storm/trident/operation/impl/CaptureCollector.java @@ -36,6 +36,11 @@ public class CaptureCollector implements TridentCollector { } @Override + public void flush() { + _coll.flush(); + } + + @Override public void reportError(Throwable t) { _coll.reportError(t); } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/operation/impl/GroupCollector.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/trident/operation/impl/GroupCollector.java b/storm-client/src/jvm/org/apache/storm/trident/operation/impl/GroupCollector.java index 0f0d014..e70cc28 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/operation/impl/GroupCollector.java +++ b/storm-client/src/jvm/org/apache/storm/trident/operation/impl/GroupCollector.java @@ -41,6 +41,11 @@ public class GroupCollector implements TridentCollector { } @Override + public void flush() { + _collector.flush(); + } + + @Override public void reportError(Throwable t) { _collector.reportError(t); } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/BridgeReceiver.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/trident/planner/BridgeReceiver.java b/storm-client/src/jvm/org/apache/storm/trident/planner/BridgeReceiver.java index 9626f98..6ccea23 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/planner/BridgeReceiver.java +++ b/storm-client/src/jvm/org/apache/storm/trident/planner/BridgeReceiver.java @@ -34,5 +34,9 @@ public class BridgeReceiver implements TupleReceiver { public void execute(ProcessorContext context, String streamId, TridentTuple tuple) { _collector.emit(streamId, new ConsList(context.batchId, tuple)); } - + + @Override + public void flush() { + _collector.flush(); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/TupleReceiver.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/trident/planner/TupleReceiver.java b/storm-client/src/jvm/org/apache/storm/trident/planner/TupleReceiver.java index 4b96465..1c5a1d6 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/planner/TupleReceiver.java +++ b/storm-client/src/jvm/org/apache/storm/trident/planner/TupleReceiver.java @@ -23,5 +23,5 @@ import org.apache.storm.trident.tuple.TridentTuple; public interface TupleReceiver { //streaId indicates where tuple came from void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple); - + void flush(); } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/processor/AggregateProcessor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/AggregateProcessor.java b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/AggregateProcessor.java index b849418..90efd5a 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/AggregateProcessor.java +++ b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/AggregateProcessor.java @@ -70,7 +70,12 @@ public class AggregateProcessor implements TridentProcessor { _collector.setContext(processorContext); _agg.aggregate(processorContext.state[_context.getStateIndex()], _projection.create(tuple), _collector); } - + + @Override + public void flush() { + _collector.flush(); + } + @Override public void finishBatch(ProcessorContext processorContext) { _collector.setContext(processorContext); http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/processor/AppendCollector.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/AppendCollector.java b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/AppendCollector.java index a4ac745..36664b0 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/AppendCollector.java +++ b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/AppendCollector.java @@ -52,6 +52,13 @@ public class AppendCollector implements TridentCollector { } @Override + public void flush() { + for(TupleReceiver r: _triContext.getReceivers()) { + r.flush(); + } + } + + @Override public void reportError(Throwable t) { _triContext.getDelegateCollector().reportError(t); } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/processor/EachProcessor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/EachProcessor.java b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/EachProcessor.java index c9b4508..2cadd22 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/EachProcessor.java +++ b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/EachProcessor.java @@ -55,6 +55,11 @@ public class EachProcessor implements TridentProcessor { } @Override + public void flush() { + _collector.flush(); + } + + @Override public void cleanup() { _function.cleanup(); } @@ -65,6 +70,7 @@ public class EachProcessor implements TridentProcessor { _function.execute(_projection.create(tuple), _collector); } + @Override public void startBatch(ProcessorContext processorContext) { } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/processor/FreshCollector.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/FreshCollector.java b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/FreshCollector.java index 999b54a..6a57943 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/FreshCollector.java +++ b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/FreshCollector.java @@ -49,6 +49,13 @@ public class FreshCollector implements TridentCollector { } @Override + public void flush() { + for(TupleReceiver r: _triContext.getReceivers()) { + r.flush(); + } + } + + @Override public void reportError(Throwable t) { _triContext.getDelegateCollector().reportError(t); } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java index 110cdaf..9e7d603 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java +++ b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/MapProcessor.java @@ -24,6 +24,7 @@ import org.apache.storm.trident.operation.MapFunction; import org.apache.storm.trident.operation.TridentOperationContext; import org.apache.storm.trident.planner.ProcessorContext; import org.apache.storm.trident.planner.TridentProcessor; +import org.apache.storm.trident.planner.TupleReceiver; import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.trident.tuple.TridentTupleView; import org.apache.storm.tuple.Fields; @@ -71,6 +72,11 @@ public class MapProcessor implements TridentProcessor { } @Override + public void flush() { + _collector.flush(); + } + + @Override public void startBatch(ProcessorContext processorContext) { // NOOP } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/processor/MultiReducerProcessor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/MultiReducerProcessor.java b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/MultiReducerProcessor.java index 77dead1..1b7feb4 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/MultiReducerProcessor.java +++ b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/MultiReducerProcessor.java @@ -79,7 +79,11 @@ public class MultiReducerProcessor implements TridentProcessor { int i = _streamToIndex.get(streamId); _reducer.execute(processorContext.state[_context.getStateIndex()], i, _projectionFactories[i].create(tuple), _collector); } - + + @Override + public void flush() { + _collector.flush(); + } @Override public void finishBatch(ProcessorContext processorContext) { _collector.setContext(processorContext); http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/processor/PartitionPersistProcessor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/PartitionPersistProcessor.java b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/PartitionPersistProcessor.java index af4c8f6..e00c15c 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/PartitionPersistProcessor.java +++ b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/PartitionPersistProcessor.java @@ -77,6 +77,11 @@ public class PartitionPersistProcessor implements TridentProcessor { } @Override + public void flush() { + // NO-OP + } + + @Override public void finishBatch(ProcessorContext processorContext) { _collector.setContext(processorContext); Object batchId = processorContext.batchId; http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/processor/ProjectedProcessor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/ProjectedProcessor.java b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/ProjectedProcessor.java index 4ecb251..d80591e 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/ProjectedProcessor.java +++ b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/ProjectedProcessor.java @@ -63,6 +63,13 @@ public class ProjectedProcessor implements TridentProcessor { } @Override + public void flush() { + for(TupleReceiver r: _context.getReceivers()) { + r.flush(); + } + } + + @Override public void finishBatch(ProcessorContext processorContext) { } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/planner/processor/StateQueryProcessor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/StateQueryProcessor.java b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/StateQueryProcessor.java index 2d9ebc0..1d1d515 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/planner/processor/StateQueryProcessor.java +++ b/storm-client/src/jvm/org/apache/storm/trident/planner/processor/StateQueryProcessor.java @@ -79,6 +79,11 @@ public class StateQueryProcessor implements TridentProcessor { } @Override + public void flush() { + // NO-OP + } + + @Override public void finishBatch(ProcessorContext processorContext) { BatchState state = (BatchState) processorContext.state[_context.getStateIndex()]; if(!state.tuples.isEmpty()) { http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchExecutor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchExecutor.java b/storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchExecutor.java index dd950d3..9f2a251 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchExecutor.java +++ b/storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchExecutor.java @@ -194,7 +194,12 @@ public class RichSpoutBatchExecutor implements ITridentSpout<Object> { public void emitDirect(int task, String stream, List<Object> values, Object id) { throw new UnsupportedOperationException("Trident does not support direct streams"); } - + + @Override + public void flush() { + _collector.flush(); + } + @Override public long getPendingCount() { return pendingCount; http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchTriggerer.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchTriggerer.java b/storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchTriggerer.java index af98465..9447f7a 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchTriggerer.java +++ b/storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchTriggerer.java @@ -171,6 +171,11 @@ public class RichSpoutBatchTriggerer implements IRichSpout { } @Override + public void flush() { + _collector.flush(); + } + + @Override public void reportError(Throwable t) { _collector.reportError(t); } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/spout/TridentSpoutExecutor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/trident/spout/TridentSpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/trident/spout/TridentSpoutExecutor.java index b60a7bd..fa9827c 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/spout/TridentSpoutExecutor.java +++ b/storm-client/src/jvm/org/apache/storm/trident/spout/TridentSpoutExecutor.java @@ -131,6 +131,11 @@ public class TridentSpoutExecutor implements ITridentBatchBolt { } @Override + public void flush() { + _delegate.flush(); + } + + @Override public void reportError(Throwable t) { _delegate.reportError(t); } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java b/storm-client/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java index 88d222c..a0b048f 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java +++ b/storm-client/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java @@ -185,7 +185,12 @@ public class TridentBoltExecutor implements IRichBolt { public void resetTimeout(Tuple tuple) { throw new IllegalStateException("Method should never be called"); } - + + @Override + public void flush() { + _delegate.flush(); + } + public void reportError(Throwable error) { _delegate.reportError(error); } @@ -208,7 +213,7 @@ public class TridentBoltExecutor implements IRichBolt { TopologyContext _context; @Override - public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) { + public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) { _messageTimeoutMs = context.maxTopologyMessageTimeout() * 1000L; _lastRotate = System.currentTimeMillis(); _batches = new RotatingMap<>(2); http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java b/storm-client/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java index e412eb1..672fad6 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java +++ b/storm-client/src/jvm/org/apache/storm/trident/windowing/AbstractTridentWindowManager.java @@ -181,6 +181,11 @@ public abstract class AbstractTridentWindowManager<T> implements ITridentWindowM } @Override + public void flush() { + // NO-OP + } + + @Override public void reportError(Throwable t) { delegateCollector.reportError(t); } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java b/storm-client/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java index 721063a..c1f9d5b 100644 --- a/storm-client/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java +++ b/storm-client/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java @@ -148,6 +148,11 @@ public class WindowTridentProcessor implements TridentProcessor { } @Override + public void flush() { + // NO-OP + } + + @Override public void finishBatch(ProcessorContext processorContext) { Object batchId = processorContext.batchId; http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/tuple/AddressedTuple.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/tuple/AddressedTuple.java b/storm-client/src/jvm/org/apache/storm/tuple/AddressedTuple.java index 0ea1291..c7661a5 100644 --- a/storm-client/src/jvm/org/apache/storm/tuple/AddressedTuple.java +++ b/storm-client/src/jvm/org/apache/storm/tuple/AddressedTuple.java @@ -17,10 +17,13 @@ */ package org.apache.storm.tuple; +import org.apache.storm.Constants; +import org.apache.storm.task.GeneralTopologyContext; + /** * A Tuple that is addressed to a destination. */ -public class AddressedTuple { +public final class AddressedTuple { /** * Destination used when broadcasting a tuple. */ @@ -45,4 +48,10 @@ public class AddressedTuple { public String toString() { return "[dest: "+dest+" tuple: "+tuple+"]"; } + + public static AddressedTuple createFlushTuple(GeneralTopologyContext workerTopologyContext) { + TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(), Constants.SYSTEM_COMPONENT_ID, + (int) Constants.SYSTEM_TASK_ID, Constants.SYSTEM_FLUSH_STREAM_ID); + return new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple); // one instance per executor avoids false sharing of CPU cache + } } http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/tuple/MessageId.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/tuple/MessageId.java b/storm-client/src/jvm/org/apache/storm/tuple/MessageId.java index 6883995..bce7946 100644 --- a/storm-client/src/jvm/org/apache/storm/tuple/MessageId.java +++ b/storm-client/src/jvm/org/apache/storm/tuple/MessageId.java @@ -20,6 +20,7 @@ package org.apache.storm.tuple; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -27,14 +28,16 @@ import java.util.Random; import java.util.Set; public class MessageId { + final static MessageId unanchoredMsgId = makeId(Collections.emptyMap()); + private Map<Long, Long> _anchorsToIds; - + public static long generateId(Random rand) { return rand.nextLong(); } public static MessageId makeUnanchored() { - return makeId(new HashMap<Long, Long>()); + return unanchoredMsgId; } public static MessageId makeId(Map<Long, Long> anchorsToIds) { http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java b/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java index 9356c4b..28dae6d 100644 --- a/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java +++ b/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java @@ -24,50 +24,55 @@ import org.apache.storm.generated.GlobalStreamId; import org.apache.storm.task.GeneralTopologyContext; public class TupleImpl implements Tuple { - private final List<Object> values; - private final int taskId; - private final String streamId; - private final GeneralTopologyContext context; - private final MessageId id; + private List<Object> values; + private int taskId; + private String streamId; + private GeneralTopologyContext context; + private MessageId id; + private final String srcComponent; private Long _processSampleStartTime; private Long _executeSampleStartTime; private long _outAckVal = 0; - + public TupleImpl(Tuple t) { this.values = t.getValues(); this.taskId = t.getSourceTask(); this.streamId = t.getSourceStreamId(); this.id = t.getMessageId(); this.context = t.getContext(); - if (t instanceof TupleImpl) { + this.srcComponent = t.getSourceComponent(); + try { TupleImpl ti = (TupleImpl) t; this._processSampleStartTime = ti._processSampleStartTime; this._executeSampleStartTime = ti._executeSampleStartTime; this._outAckVal = ti._outAckVal; + } catch (ClassCastException e) { + // ignore ... if t is not a TupleImpl type .. faster than checking and then casting } } - public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) { + public TupleImpl(GeneralTopologyContext context, List<Object> values, String srcComponent, int taskId, String streamId, MessageId id) { this.values = Collections.unmodifiableList(values); this.taskId = taskId; this.streamId = streamId; this.id = id; this.context = context; - - String componentId = context.getComponentId(taskId); - Fields schema = context.getComponentOutputFields(componentId, streamId); - if(values.size()!=schema.size()) { - throw new IllegalArgumentException( - "Tuple created with wrong number of fields. " + - "Expected " + schema.size() + " fields but got " + - values.size() + " fields"); + this.srcComponent = srcComponent; + + if (context.doSanityCheck()) { + String componentId = context.getComponentId(taskId); + Fields schema = context.getComponentOutputFields(componentId, streamId); + if (values.size() != schema.size()) { + throw new IllegalArgumentException("Tuple created with wrong number of fields. Expected " + schema.size() + + " fields but got " + values.size() + " fields"); + } } } - public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId) { - this(context, values, taskId, streamId, MessageId.makeUnanchored()); - } - + public TupleImpl(GeneralTopologyContext context, List<Object> values, String srcComponent, int taskId, String streamId) { + this(context, values, srcComponent, taskId, streamId, MessageId.makeUnanchored()); + } + public void setProcessSampleStartTime(long ms) { _processSampleStartTime = ms; } @@ -83,7 +88,7 @@ public class TupleImpl implements Tuple { public Long getExecuteSampleStartTime() { return _executeSampleStartTime; } - + public void updateAckVal(long val) { _outAckVal = _outAckVal ^ val; } @@ -91,7 +96,7 @@ public class TupleImpl implements Tuple { public long getAckVal() { return _outAckVal; } - + /** Tuple APIs*/ @Override public int size() { @@ -213,7 +218,7 @@ public class TupleImpl implements Tuple { return values; } - @Override + @Override public Fields getFields() { return context.getComponentOutputFields(getSourceComponent(), getSourceStreamId()); } @@ -235,7 +240,7 @@ public class TupleImpl implements Tuple { @Override public String getSourceComponent() { - return context.getComponentId(taskId); + return srcComponent; } @Override @@ -257,7 +262,7 @@ public class TupleImpl implements Tuple { public GeneralTopologyContext getContext() { return context; } - + @Override public String toString() { return "source: " + getSourceComponent() + ":" + taskId + ", stream: " + streamId + ", id: "+ id.toString() + ", " + values.toString() + " PROC_START_TIME(sampled): " + _processSampleStartTime + " EXEC_START_TIME(sampled): " + _executeSampleStartTime; http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java index 1bc94ac..0d5bab8 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.stream.Collectors; +import java.util.function.BooleanSupplier; import org.apache.commons.io.FileUtils; import org.apache.storm.Config; @@ -144,19 +145,19 @@ public class ConfigUtils { throw new IllegalArgumentException("Illegal topology.stats.sample.rate in conf: " + rate); } - public static Callable<Boolean> mkStatsSampler(Map<String, Object> conf) { + public static BooleanSupplier mkStatsSampler(Map<String, Object> conf) { return evenSampler(samplingRate(conf)); } - public static Callable<Boolean> evenSampler(final int samplingFreq) { + public static BooleanSupplier evenSampler(final int samplingFreq) { final Random random = new Random(); - return new Callable<Boolean>() { + return new BooleanSupplier() { private int curr = -1; private int target = random.nextInt(samplingFreq); @Override - public Boolean call() throws Exception { + public boolean getAsBoolean() { curr++; if (curr >= samplingFreq) { curr = 0; http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/utils/DisruptorBackpressureCallback.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/utils/DisruptorBackpressureCallback.java b/storm-client/src/jvm/org/apache/storm/utils/DisruptorBackpressureCallback.java deleted file mode 100644 index 69be7be..0000000 --- a/storm-client/src/jvm/org/apache/storm/utils/DisruptorBackpressureCallback.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.storm.utils; - -public interface DisruptorBackpressureCallback { - - void highWaterMark() throws Exception; - - void lowWaterMark() throws Exception; -}