Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2588#discussion_r173635422
  
    --- Diff: 
external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/core/PartitionManager.java
 ---
    @@ -0,0 +1,132 @@
    
+/*******************************************************************************
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + 
*******************************************************************************/
    +package org.apache.storm.eventhubs.core;
    +
    +import java.util.LinkedHashMap;
    +import java.util.Map;
    +import java.util.TreeSet;
    +
    +import org.apache.storm.eventhubs.state.IStateStore;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.esotericsoftware.minlog.Log;
    +import com.microsoft.azure.eventhubs.EventData;
    +
    +public class PartitionManager extends SimplePartitionManager {
    +    private static final Logger logger = 
LoggerFactory.getLogger(PartitionManager.class);
    +
    +    // all sent events are stored in pending
    +    private final Map<String, EventHubMessage> pending;
    +
    +    // all failed events are put in toResend, which is sorted by event's 
offset
    +    private final TreeSet<EventHubMessage> toResend;
    +
    +    private final TreeSet<EventHubMessage> waitingToEmit;
    +
    +    public PartitionManager(EventHubConfig ehConfig, String partitionId, 
IStateStore stateStore,
    +                            IEventHubReceiver receiver) {
    +        super(ehConfig, partitionId, stateStore, receiver);
    +
    +        this.pending = new LinkedHashMap<String, EventHubMessage>();
    +        this.toResend = new TreeSet<EventHubMessage>();
    +        this.waitingToEmit = new TreeSet<EventHubMessage>();
    +    }
    +
    +    private void fill() {
    +        Iterable<EventData> receivedEvents = 
receiver.receive(config.getReceiveEventsMaxCount());
    +        if (receivedEvents == null || 
receivedEvents.spliterator().getExactSizeIfKnown() == 0) {
    +            logger.debug("No messages received from EventHub.");
    +            return;
    +        }
    +
    +        String startOffset = null;
    +        String endOffset = null;
    +        for (EventData ed : receivedEvents) {
    +            EventHubMessage ehm = new EventHubMessage(ed, partitionId);
    +            startOffset = (startOffset == null) ? ehm.getOffset() : 
startOffset;
    +            endOffset = ehm.getOffset();
    +            waitingToEmit.add(ehm);
    +        }
    +
    +        logger.debug("Received Messages Start Offset: " + startOffset + ", 
End Offset: " + endOffset);
    +    }
    +
    +    @Override
    +    public EventHubMessage receive() {
    +        logger.debug("Retrieving messages for partition: " + partitionId);
    +        int countToRetrieve = pending.size() - 
config.getMaxPendingMsgsPerPartition();
    --- End diff --
    
    The name on this is confusing. It's the number of messages that pending is 
over the limit, not the number of messages to retrieve.


---

Reply via email to