Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2241#discussion_r158941280
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/JCQueue.java ---
    @@ -0,0 +1,457 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License
    + */
    +
    +package org.apache.storm.utils;
    +
    +import org.apache.storm.policy.IWaitStrategy;
    +import org.apache.storm.metric.api.IStatefulObject;
    +import org.apache.storm.metric.internal.RateTracker;
    +import org.jctools.queues.MessagePassingQueue;
    +import org.jctools.queues.MpscArrayQueue;
    +import org.jctools.queues.MpscUnboundedArrayQueue;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +
    +public final class JCQueue implements IStatefulObject {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(JCQueue.class);
    +
    +    public static final Object INTERRUPT = new Object();
    +
    +    private final ThroughputMeter emptyMeter = new 
ThroughputMeter("EmptyBatch");
    +    private final ExitCondition continueRunning = () -> true;
    +
    +    private interface Inserter {
    +        // blocking call that can be interrupted using Thread.interrupt()
    +        void publish(Object obj) throws InterruptedException;
    +        boolean tryPublish(Object obj);
    +
    +        void flush() throws InterruptedException;
    +        boolean tryFlush();
    +    }
    +
    +    /* Thread safe. Same instance can be used across multiple threads */
    +    private static class DirectInserter implements Inserter {
    +        private JCQueue q;
    +
    +        public DirectInserter(JCQueue q) {
    +            this.q = q;
    +        }
    +
    +        /** Blocking call, that can be interrupted via Thread.interrupt */
    +        @Override
    +        public void publish(Object obj) throws InterruptedException {
    +            boolean inserted = q.tryPublishInternal(obj);
    +            int idleCount = 0;
    +            while (!inserted) {
    +                q.metrics.notifyInsertFailure();
    +                if (idleCount==0) { // check avoids multiple log msgs when 
in a idle loop
    +                    LOG.debug("Experiencing Back Pressure on recvQueue: 
'{}'. Entering BackPressure Wait", q.getName());
    +                }
    +
    +                idleCount = q.backPressureWaitStrategy.idle(idleCount);
    +                if (Thread.interrupted()) {
    +                    throw new InterruptedException();
    +                }
    +                inserted = q.tryPublishInternal(obj);
    +            }
    +
    +        }
    +
    +        /** Non-Blocking call. return value indicates success/failure */
    +        @Override
    +        public boolean tryPublish(Object obj) {
    +            boolean inserted = q.tryPublishInternal(obj);
    +            if (!inserted) {
    +                q.metrics.notifyInsertFailure();
    +                return false;
    +            }
    +            return true;
    +        }
    +
    +        @Override
    +        public void flush() throws InterruptedException {
    +            return;
    +        }
    +
    +        @Override
    +        public boolean tryFlush() {
    +            return true;
    +        }
    +    } // class DirectInserter
    +
    +    private static class BatchInserter implements Inserter {
    +        private JCQueue q;
    +        private final int batchSz;
    +        private ArrayList<Object> currentBatch;
    +
    +        public BatchInserter(JCQueue q, int batchSz) {
    +            this.q = q;
    +            this.batchSz = batchSz;
    +            this.currentBatch = new ArrayList<>(batchSz + 1);
    +        }
    +
    +        /** Blocking call - retires till element is successfully added */
    +        @Override
    +        public void publish(Object obj) throws InterruptedException {
    +            currentBatch.add(obj);
    +            if (currentBatch.size() >= batchSz) {
    +                flush();
    +            }
    +        }
    +
    +        /** Non-Blocking call. return value indicates success/failure */
    +        @Override
    +        public boolean tryPublish(Object obj) {
    +            if (currentBatch.size() >= batchSz) {
    +                if (!tryFlush()) {
    +                    return false;
    +                }
    +            }
    +            currentBatch.add(obj);
    +            return true;
    +        }
    +
    +        /** Blocking call - Does not return until at least 1 element is 
drained or Thread.interrupt() is received.
    +         *    Uses backpressure wait strategy. */
    +        @Override
    +        public void flush() throws InterruptedException {
    +            if (currentBatch.isEmpty()) {
    +                return;
    +            }
    +            int publishCount = q.tryPublishInternal(currentBatch);
    +            int retryCount = 0;
    +            while (publishCount == 0) { // retry till at least 1 element 
is drained
    +                q.metrics.notifyInsertFailure();
    +                if (retryCount==0) { // check avoids multiple log msgs 
when in a idle loop
    +                    LOG.debug("Experiencing Back Pressure when flushing 
batch to Q: {}. Entering BackPressure Wait.", q.getName());
    +                }
    +                retryCount = q.backPressureWaitStrategy.idle(retryCount);
    +                if (Thread.interrupted()) {
    +                    throw new InterruptedException();
    +                }
    +                publishCount = q.tryPublishInternal(currentBatch);
    +            }
    +            currentBatch.subList(0, publishCount).clear();
    +        }
    +
    +        /** Non blocking call. tries to flush as many as possible. Returns 
true if at least one from non-empty currentBatch was flushed
    +         *      or if currentBatch is empty. Returns false otherwise */
    +        @Override
    +        public boolean tryFlush() {
    +            if (currentBatch.isEmpty()) {
    +                return true;
    +            }
    +            int publishCount = q.tryPublishInternal(currentBatch);
    +            if (publishCount == 0) {
    +                q.metrics.notifyInsertFailure();
    +                return false;
    +            } else {
    +                currentBatch.subList(0, publishCount).clear();
    +                return true;
    +            }
    +        }
    +    } // class BatchInserter
    +
    +    /**
    +     * This inner class provides methods to access the metrics of the 
disruptor recvQueue.
    +     */
    +    public class QueueMetrics {
    +        private final RateTracker arrivalsTracker = new RateTracker(10000, 
10);
    +        private final RateTracker insertFailuresTracker = new 
RateTracker(10000, 10);
    +        private final AtomicLong droppedMessages = new AtomicLong(0);
    +
    +        public long population() {
    +            return recvQueue.size();
    +        }
    +
    +        public long capacity() {
    +            return recvQueue.capacity();
    +        }
    +
    +        public Object getState() {
    +            HashMap state = new HashMap<String, Object>();
    +
    +            final double arrivalRateInSecs = arrivalsTracker.reportRate();
    +
    +            long tuplePop = population();
    +
    +            // Assume the recvQueue is stable, in which the arrival rate 
is equal to the consumption rate.
    +            // If this assumption does not hold, the calculation of 
sojourn time should also consider
    +            // departure rate according to Queuing Theory.
    +            final double sojournTime = tuplePop / 
Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
    +
    +            long cap = capacity();
    +            float pctFull = (1.0F * tuplePop / cap);
    +
    +            state.put("capacity", cap);
    +            state.put("pct_full", pctFull);
    +            state.put("population", tuplePop);
    +
    +            state.put("arrival_rate_secs", arrivalRateInSecs);
    +            state.put("sojourn_time_ms", sojournTime); //element sojourn 
time in milliseconds
    +            state.put("insert_failures", 
insertFailuresTracker.reportRate());
    +            state.put("dropped_messages", droppedMessages);
    +            state.put("overflow", overflowQ.size());
    +            return state;
    +        }
    +
    +        public void notifyArrivals(long counts) {
    +            arrivalsTracker.notify(counts);
    +        }
    +
    +        public void notifyInsertFailure() {
    +            insertFailuresTracker.notify(1);
    +        }
    +
    +        public void notifyDroppedMsg() {
    +            droppedMessages.incrementAndGet();
    +        }
    +
    +        public void close() {
    +            arrivalsTracker.close();
    +            insertFailuresTracker.close();
    +        }
    +
    +    }
    +
    +    private final MpscArrayQueue<Object> recvQueue;
    +    private final MpscUnboundedArrayQueue<Object> overflowQ; // only holds 
msgs from other workers (via WorkerTransfer), when recvQueue is full
    +    private final int overflowLimit; // ensures... overflowCount <= 
overflowLimit. if set to 0, disables overflow.
    +
    +
    +    private final int producerBatchSz;
    +    private final DirectInserter directInserter = new DirectInserter(this);
    +
    +    private final ThreadLocal<BatchInserter> thdLocalBatcher = new 
ThreadLocal<BatchInserter>();
    +
    +    private final JCQueue.QueueMetrics metrics;
    +
    +    private String queueName;
    +    private final IWaitStrategy backPressureWaitStrategy;
    +
    +    public JCQueue(String queueName, int size, int overflowLimit, int 
producerBatchSz, IWaitStrategy backPressureWaitStrategy) {
    +        this.queueName = queueName;
    +        this.overflowLimit = overflowLimit;
    +        this.recvQueue = new MpscArrayQueue<>(size);
    +        this.overflowQ = new MpscUnboundedArrayQueue<>(size);
    +
    +        this.metrics = new JCQueue.QueueMetrics();
    +
    +        //The batch size can be no larger than half the full recvQueue 
size, to avoid contention issues.
    +        this.producerBatchSz = Math.max(1, Math.min(producerBatchSz, size 
/ 2));
    +        this.backPressureWaitStrategy = backPressureWaitStrategy;
    +    }
    +
    +    public String getName() {
    +        return queueName;
    +    }
    +
    +
    +    public boolean haltWithInterrupt() {
    +        boolean res = tryPublishInternal(INTERRUPT);
    +        metrics.close();
    +        return res;
    +    }
    +
    +
    +    /**
    +     * Non blocking. Returns immediately if Q is empty. Returns number of 
elements consumed from Q
    +     */
    +    public int consume(JCQueue.Consumer consumer) {
    +        return consume(consumer, continueRunning);
    +    }
    +
    +    /**
    +     * Non blocking. Returns immediately if Q is empty. Runs till Q is 
empty OR exitCond.keepRunning() return false.
    +     * Returns number of elements consumed from Q
    +     */
    +    public int consume(JCQueue.Consumer consumer, ExitCondition exitCond) {
    +        try {
    +            return consumeImpl(consumer, exitCond);
    +        } catch (InterruptedException e) {
    +            throw new RuntimeException(e);
    +        }
    +    }
    +
    +    public int size() { return recvQueue.size() + overflowQ.size(); }
    +
    +    /**
    +     * Non blocking. Returns immediately if Q is empty. Returns number of 
elements consumed from Q
    +     *  @param exitCond
    +     */
    +    private int consumeImpl(Consumer consumer, ExitCondition exitCond) 
throws InterruptedException {
    +        int drainCount = 0;
    +        while ( exitCond.keepRunning() ) {
    +            Object tuple = recvQueue.poll();
    +            if (tuple == null) {
    +                break;
    +            }
    +            consumer.accept(tuple);
    +            ++drainCount;
    +        }
    +
    +        int overflowDrainCount = 0;
    +        int limit = overflowQ.size();
    +        while (exitCond.keepRunning()  &&  (overflowDrainCount < limit)) { 
// 2nd cond prevents staying stuck with consuming overflow
    +            Object tuple = overflowQ.poll();
    +            ++overflowDrainCount;
    +            consumer.accept(tuple);
    +        }
    +        int total = drainCount + overflowDrainCount;
    +        if (total > 0) {
    +            consumer.flush();
    +        } else {
    +            emptyMeter.record();
    +        }
    +        return total;
    +    }
    +
    +    // Non Blocking. returns true/false indicating success/failure. Fails 
if full.
    +    private boolean tryPublishInternal(Object obj) {
    +        if (recvQueue.offer(obj)) {
    +            metrics.notifyArrivals(1);
    +            return true;
    +        }
    +        return false;
    +    }
    +
    +    // Non Blocking. returns count of how many inserts succeeded
    +    private int tryPublishInternal(ArrayList<Object> objs) {
    +        MessagePassingQueue.Supplier<Object> supplier =
    +            new MessagePassingQueue.Supplier<Object>() {
    +                int i = 0;
    +
    +                @Override
    +                public Object get() {
    +                    return objs.get(i++);
    +                }
    +            };
    +        int count = recvQueue.fill(supplier, objs.size());
    +        metrics.notifyArrivals(count);
    +        return count;
    +    }
    +
    +    private Inserter getInserter() {
    +        Inserter inserter;
    +        if (producerBatchSz > 1) {
    +            inserter = thdLocalBatcher.get();
    +            if (inserter == null) {
    +                BatchInserter b = new BatchInserter(this, producerBatchSz);
    --- End diff --
    
    To avoid casting. 
    `thdLocalBatcher.set(b)` expects a BatchInserter. `inserter` is of base 
type.


---

Reply via email to