Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2241#discussion_r159034082
--- Diff: storm-client/src/jvm/org/apache/storm/utils/JCQueue.java ---
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.storm.policy.IWaitStrategy;
+import org.apache.storm.metric.api.IStatefulObject;
+import org.apache.storm.metric.internal.RateTracker;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.MpscArrayQueue;
+import org.jctools.queues.MpscUnboundedArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public final class JCQueue implements IStatefulObject {
+ private static final Logger LOG =
LoggerFactory.getLogger(JCQueue.class);
+
+ public static final Object INTERRUPT = new Object();
+
+ private final ThroughputMeter emptyMeter = new
ThroughputMeter("EmptyBatch");
+ private final ExitCondition continueRunning = () -> true;
+
+ private interface Inserter {
+ // blocking call that can be interrupted using Thread.interrupt()
+ void publish(Object obj) throws InterruptedException;
+ boolean tryPublish(Object obj);
+
+ void flush() throws InterruptedException;
+ boolean tryFlush();
+ }
+
+ /* Thread safe. Same instance can be used across multiple threads */
+ private static class DirectInserter implements Inserter {
+ private JCQueue q;
+
+ public DirectInserter(JCQueue q) {
+ this.q = q;
+ }
+
+ /** Blocking call, that can be interrupted via Thread.interrupt */
+ @Override
+ public void publish(Object obj) throws InterruptedException {
+ boolean inserted = q.tryPublishInternal(obj);
+ int idleCount = 0;
+ while (!inserted) {
+ q.metrics.notifyInsertFailure();
+ if (idleCount==0) { // check avoids multiple log msgs when
in a idle loop
+ LOG.debug("Experiencing Back Pressure on recvQueue:
'{}'. Entering BackPressure Wait", q.getName());
+ }
+
+ idleCount = q.backPressureWaitStrategy.idle(idleCount);
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ inserted = q.tryPublishInternal(obj);
+ }
+
+ }
+
+ /** Non-Blocking call. return value indicates success/failure */
+ @Override
+ public boolean tryPublish(Object obj) {
+ boolean inserted = q.tryPublishInternal(obj);
+ if (!inserted) {
+ q.metrics.notifyInsertFailure();
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void flush() throws InterruptedException {
+ return;
+ }
+
+ @Override
+ public boolean tryFlush() {
+ return true;
+ }
+ } // class DirectInserter
+
+ private static class BatchInserter implements Inserter {
+ private JCQueue q;
+ private final int batchSz;
+ private ArrayList<Object> currentBatch;
+
+ public BatchInserter(JCQueue q, int batchSz) {
+ this.q = q;
+ this.batchSz = batchSz;
+ this.currentBatch = new ArrayList<>(batchSz + 1);
+ }
+
+ /** Blocking call - retires till element is successfully added */
+ @Override
+ public void publish(Object obj) throws InterruptedException {
+ currentBatch.add(obj);
+ if (currentBatch.size() >= batchSz) {
+ flush();
+ }
+ }
+
+ /** Non-Blocking call. return value indicates success/failure */
+ @Override
+ public boolean tryPublish(Object obj) {
+ if (currentBatch.size() >= batchSz) {
+ if (!tryFlush()) {
+ return false;
+ }
+ }
+ currentBatch.add(obj);
+ return true;
+ }
+
+ /** Blocking call - Does not return until at least 1 element is
drained or Thread.interrupt() is received.
+ * Uses backpressure wait strategy. */
+ @Override
+ public void flush() throws InterruptedException {
+ if (currentBatch.isEmpty()) {
+ return;
+ }
+ int publishCount = q.tryPublishInternal(currentBatch);
+ int retryCount = 0;
+ while (publishCount == 0) { // retry till at least 1 element
is drained
+ q.metrics.notifyInsertFailure();
+ if (retryCount==0) { // check avoids multiple log msgs
when in a idle loop
+ LOG.debug("Experiencing Back Pressure when flushing
batch to Q: {}. Entering BackPressure Wait.", q.getName());
+ }
+ retryCount = q.backPressureWaitStrategy.idle(retryCount);
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ publishCount = q.tryPublishInternal(currentBatch);
+ }
+ currentBatch.subList(0, publishCount).clear();
+ }
+
+ /** Non blocking call. tries to flush as many as possible. Returns
true if at least one from non-empty currentBatch was flushed
+ * or if currentBatch is empty. Returns false otherwise */
+ @Override
+ public boolean tryFlush() {
+ if (currentBatch.isEmpty()) {
+ return true;
+ }
+ int publishCount = q.tryPublishInternal(currentBatch);
+ if (publishCount == 0) {
+ q.metrics.notifyInsertFailure();
+ return false;
+ } else {
+ currentBatch.subList(0, publishCount).clear();
+ return true;
+ }
+ }
+ } // class BatchInserter
+
+ /**
+ * This inner class provides methods to access the metrics of the
disruptor recvQueue.
+ */
+ public class QueueMetrics {
+ private final RateTracker arrivalsTracker = new RateTracker(10000,
10);
+ private final RateTracker insertFailuresTracker = new
RateTracker(10000, 10);
+ private final AtomicLong droppedMessages = new AtomicLong(0);
+
+ public long population() {
+ return recvQueue.size();
+ }
+
+ public long capacity() {
+ return recvQueue.capacity();
+ }
+
+ public Object getState() {
+ HashMap state = new HashMap<String, Object>();
+
+ final double arrivalRateInSecs = arrivalsTracker.reportRate();
+
+ long tuplePop = population();
+
+ // Assume the recvQueue is stable, in which the arrival rate
is equal to the consumption rate.
+ // If this assumption does not hold, the calculation of
sojourn time should also consider
+ // departure rate according to Queuing Theory.
+ final double sojournTime = tuplePop /
Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
+
+ long cap = capacity();
+ float pctFull = (1.0F * tuplePop / cap);
+
+ state.put("capacity", cap);
+ state.put("pct_full", pctFull);
+ state.put("population", tuplePop);
+
+ state.put("arrival_rate_secs", arrivalRateInSecs);
+ state.put("sojourn_time_ms", sojournTime); //element sojourn
time in milliseconds
+ state.put("insert_failures",
insertFailuresTracker.reportRate());
+ state.put("dropped_messages", droppedMessages);
+ state.put("overflow", overflowQ.size());
+ return state;
+ }
+
+ public void notifyArrivals(long counts) {
+ arrivalsTracker.notify(counts);
+ }
+
+ public void notifyInsertFailure() {
+ insertFailuresTracker.notify(1);
+ }
+
+ public void notifyDroppedMsg() {
+ droppedMessages.incrementAndGet();
+ }
+
+ public void close() {
+ arrivalsTracker.close();
+ insertFailuresTracker.close();
+ }
+
+ }
+
+ private final MpscArrayQueue<Object> recvQueue;
+ private final MpscUnboundedArrayQueue<Object> overflowQ; // only holds
msgs from other workers (via WorkerTransfer), when recvQueue is full
+ private final int overflowLimit; // ensures... overflowCount <=
overflowLimit. if set to 0, disables overflow.
+
+
+ private final int producerBatchSz;
+ private final DirectInserter directInserter = new DirectInserter(this);
+
+ private final ThreadLocal<BatchInserter> thdLocalBatcher = new
ThreadLocal<BatchInserter>();
+
+ private final JCQueue.QueueMetrics metrics;
+
+ private String queueName;
+ private final IWaitStrategy backPressureWaitStrategy;
+
+ public JCQueue(String queueName, int size, int overflowLimit, int
producerBatchSz, IWaitStrategy backPressureWaitStrategy) {
+ this.queueName = queueName;
+ this.overflowLimit = overflowLimit;
+ this.recvQueue = new MpscArrayQueue<>(size);
+ this.overflowQ = new MpscUnboundedArrayQueue<>(size);
+
+ this.metrics = new JCQueue.QueueMetrics();
+
+ //The batch size can be no larger than half the full recvQueue
size, to avoid contention issues.
+ this.producerBatchSz = Math.max(1, Math.min(producerBatchSz, size
/ 2));
+ this.backPressureWaitStrategy = backPressureWaitStrategy;
+ }
+
+ public String getName() {
+ return queueName;
+ }
+
+
+ public boolean haltWithInterrupt() {
+ boolean res = tryPublishInternal(INTERRUPT);
+ metrics.close();
+ return res;
+ }
+
+
+ /**
+ * Non blocking. Returns immediately if Q is empty. Returns number of
elements consumed from Q
+ */
+ public int consume(JCQueue.Consumer consumer) {
+ return consume(consumer, continueRunning);
+ }
+
+ /**
+ * Non blocking. Returns immediately if Q is empty. Runs till Q is
empty OR exitCond.keepRunning() return false.
+ * Returns number of elements consumed from Q
+ */
+ public int consume(JCQueue.Consumer consumer, ExitCondition exitCond) {
+ try {
+ return consumeImpl(consumer, exitCond);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public int size() { return recvQueue.size() + overflowQ.size(); }
+
+ /**
+ * Non blocking. Returns immediately if Q is empty. Returns number of
elements consumed from Q
+ * @param exitCond
+ */
+ private int consumeImpl(Consumer consumer, ExitCondition exitCond)
throws InterruptedException {
+ int drainCount = 0;
+ while ( exitCond.keepRunning() ) {
+ Object tuple = recvQueue.poll();
+ if (tuple == null) {
+ break;
+ }
+ consumer.accept(tuple);
+ ++drainCount;
+ }
+
+ int overflowDrainCount = 0;
+ int limit = overflowQ.size();
+ while (exitCond.keepRunning() && (overflowDrainCount < limit)) {
// 2nd cond prevents staying stuck with consuming overflow
+ Object tuple = overflowQ.poll();
+ ++overflowDrainCount;
+ consumer.accept(tuple);
+ }
+ int total = drainCount + overflowDrainCount;
+ if (total > 0) {
+ consumer.flush();
+ } else {
+ emptyMeter.record();
+ }
+ return total;
+ }
+
+ // Non Blocking. returns true/false indicating success/failure. Fails
if full.
+ private boolean tryPublishInternal(Object obj) {
+ if (recvQueue.offer(obj)) {
+ metrics.notifyArrivals(1);
+ return true;
+ }
+ return false;
+ }
+
+ // Non Blocking. returns count of how many inserts succeeded
+ private int tryPublishInternal(ArrayList<Object> objs) {
+ MessagePassingQueue.Supplier<Object> supplier =
+ new MessagePassingQueue.Supplier<Object>() {
+ int i = 0;
+
+ @Override
+ public Object get() {
+ return objs.get(i++);
+ }
+ };
+ int count = recvQueue.fill(supplier, objs.size());
+ metrics.notifyArrivals(count);
+ return count;
+ }
+
+ private Inserter getInserter() {
+ Inserter inserter;
+ if (producerBatchSz > 1) {
+ inserter = thdLocalBatcher.get();
+ if (inserter == null) {
+ BatchInserter b = new BatchInserter(this, producerBatchSz);
+ inserter = b;
+ thdLocalBatcher.set(b);
+ }
+ } else {
+ inserter = directInserter;
+ }
+ return inserter;
+ }
+
+ /**
+ * Blocking call. Retries till it can successfully publish the obj.
Can be interrupted via Thread.interrupt().
+ */
+ public void publish(Object obj) throws InterruptedException {
+ Inserter inserter = getInserter();
+ inserter.publish(obj);
+ }
+
+ /**
+ * Non-blocking call, returns false if full
+ **/
+ public boolean tryPublish(Object obj) {
+ Inserter inserter = getInserter();
+ return inserter.tryPublish(obj);
+ }
+
+ /** Non-blocking call. Bypasses any batching that may be enabled on
the recvQueue. Intended for sending flush/metrics tuples */
+ public boolean tryPublishDirect(Object obj) {
+ return tryPublishInternal(obj);
+ }
+
+ /**
+ * Un-batched write to overflowQ. Should only be called by
WorkerTransfer
+ * returns false if overflowLimit has reached
+ */
+ public boolean tryPublishToOverflow(Object obj) {
+ if (overflowLimit>0 && overflowQ.size() >= overflowLimit) {
+ return false;
+ }
+ overflowQ.add(obj);
+ return true;
+ }
+
+ public void recordMsgDrop() {
+ getMetrics().notifyDroppedMsg();
+ }
+
+ public boolean isEmptyOverflow() {
+ return overflowQ.isEmpty();
+ }
+
+ public int getOverflowCount() {
+ return overflowQ.size();
+ }
+
+ public int getQueuedCount() {
+ return recvQueue.size();
+ }
+
+ /**
+ * if(batchSz>1) : Blocking call. Does not return until at least 1
element is drained or Thread.interrupt() is received
+ * if(batchSz==1) : NO-OP. Returns immediately. doesnt throw.
+ */
+ public void flush() throws InterruptedException {
+ Inserter inserter = getInserter();
+ inserter.flush();
+ }
+
+ /**
+ * if(batchSz>1) : Non-Blocking call. Tries to flush as many as it
can. Returns true if flushed at least 1.
+ * if(batchSz==1) : This is a NO-OP. Returns true immediately.
+ */
+ public boolean tryFlush() {
+ Inserter inserter = getInserter();
+ return inserter.tryFlush();
+ }
+
+ @Override
+ public Object getState() {
+ return metrics.getState();
+ }
+
+
+ //This method enables the metrics to be accessed from outside of the
JCQueue class
+ public JCQueue.QueueMetrics getMetrics() {
+ return metrics;
+ }
+
+ public interface Consumer extends
org.jctools.queues.MessagePassingQueue.Consumer<Object> {
+ void accept(Object event);
+
+ void flush() throws InterruptedException;
+ }
+
+
+ public interface ExitCondition {
--- End diff --
OK makes sense. Let's keep it as it is.
---