Hi,

I've updated the trade demo (tradedemo) example for the new addressing and 
renamed it feeddemo (as it is really about ticker and market data feeds and not 
trades).

I'd like to know if this sort of demo is still useful.  I think it is, but we 
also have the reservation system demo. What's nice about this example is that 
is used TTL and LVQ.

See the attached files. Note that the OptionParser.h/.cpp is the one already 
used in drain/spout etc. so it ought to be made common. 

It can be tested tested by:

1. running setup_broker.sh which creates the exchanges.
2. Run a feed_listener (you can run it for ticker info or market data using -t 
or -m or with a custom exchange or all three:
a. ./feed_listener -t 1 -m 0  (actually default is this so you can just run 
./feed_listener)
b. ./feed_listener -t 0 -m 1
c. ./feed_listener -t 0 -m 0 TICKER/NYSE.RHT
d. ./feed_listener -m 1 TICKER/NYSE.RHT        (TICKER/NYSE.RHT is just an 
example)
3. Run feed_publisher e.g.:   ./feed_publisher -c 1000


William

Attachment: setup_broker.sh
Description: application/shellscript

/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

/**
 *  feed_publisher.cpp:
 *
 *  This program is one of two programs a a shell script designed to 
 *  be used together. 
 *  These programs implement a publish-subscribe example
 *  using the a couple of exchanges (TICKER and MRKT). 
 *  In the example multiple listeners
 *  can subscribe to the same queues for TTL messages.  
 *  The TTL messages are all ticker price data. Messages are 
 *  browsed and therefore shared among the multiple listeners. 
 *  Messages timeout using TTL so that they don't stay in the queue 
 *  for too long and fill it up.  
 *  Local exclusive LVQ are also declared for market data.
 *
 *   setup_broker.sh 
 *
 *     Declares a couple of exchanges.
 *
 *   feed_publisher.cpp 
 *
 *      Sends messages to the "TICKER" or "MRKT" exchange, using the
 *      multipart routing keys (subjects) for ticker price and market data
 *      Ticker messages are sent using a TTL value.
 *
 *   feed_listener.cpp (this program)
 *
 *      Subscribes to non-exclusive queues in NOT_ACQUIRE mode for
 *      ticker price data and declares two LVQs for market data.
 *
 *      Multiple listeners can be run at the same time.
 *
 */


#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>
#include "OptionParser.h"


#include <stdlib.h>
#include <cstdlib>
#include <iostream>
#include <set>
#include <sstream>

using namespace qpid::messaging;
using namespace qpid::types;


using std::stringstream;
using std::string;


struct Options : OptionParser
{
    std::string url;
    std::string address;
    std::string connectionOptions;
    int cycles;
    int ttl;
    int ticker, market;

    Options()
        : OptionParser("Usage: feed [OPTIONS]", "Listen for ticker (default) and/or market data messages"),
          url("127.0.0.1"),
	  cycles(100),
	  ttl(4),
	  ticker(1),
	  market(0)
    {
        add("broker,b", url, "url of broker to connect to");
        add("ttl,l", ttl, "message time to live (ttl) [400]");
        add("ticker,t", ticker, "publisher is ticker feed [1 or 0] [default on]");
        add("market,m", market, "publisher is market data feed [1 or 0] [default off]");
        add("cycles,c", cycles, "stop after cycles publishing cycles [100]");
        add("connection-options", connectionOptions, "connection options string in the form {name1=value1, name2=value2}");
    }

    bool checkAddress()
    {
      // May use this again at another time.
        if (getArguments().empty()) {
            error("Address is required");
            return false;
        } else {
            address = getArguments()[0];
            return true;
        }
    }
};

class Publisher {
  private:
    Session& session;
    Sender pub_ticker, pub_market;
    int ttl_time;
    unsigned long seq;

    unsigned short high_[6];
    unsigned short low_[6];
    unsigned long  shares_[6];
    unsigned long  volume_[6];

  public:
    Publisher( Session& session, 
	       const int ttl_time,
	       const unsigned long shares[6]);

    virtual void publish_ticker(const std::string queue, unsigned short& curr_price);
    virtual void publish_market(const std::string queue, unsigned short& curr_price, int i);
    virtual ~Publisher() { };
};

Publisher::Publisher(Session& session, int ttl_time, const unsigned long shares[6]) : 
        session(session),
        ttl_time(ttl_time),
	seq(0)
{
  // Seems that it requires a subject even though I can change subject and will do that.
  pub_ticker = session.createSender("TICKER/NYSE");
  pub_market = session.createSender("MRKT/NYSE");

  for (unsigned short i=0; i < 6; i++) {
    high_[i] = 0;
    low_[i] = 9999;
    volume_[i] = 0;
    shares_[i] = shares[i];
  }
}


void Publisher::publish_ticker(const std::string symbol, unsigned short& curr_price)
{
  Message message;

  // Set the routing key once, we'll use the same routing key for all
  // messages.

  std::cout << "TICKER Subject/Symbol:" << symbol << std::endl;
  message.setSubject(symbol); 

  // Randomally generate some price flucuations
  bool mvmnt;
  unsigned short change = rand() % 3;
  if (rand() % 2 == 0)
  {
    mvmnt = true;
    curr_price += change;
  }
  else
  {
    mvmnt = false;
    curr_price = (curr_price - change)>0 ? (curr_price - change) : 0;
  }

  // Was there change in price or no change ?
  std::string movement;
  if (!change) 
  {
    movement = "] [--]";
  } else 
  {
    movement = (mvmnt ? "] [UP]" : "] [DOWN]");
  }

  stringstream ticker_data;
  // Build up the ticker info  
  ticker_data << "[TICKER] " << "Symbol:" << symbol << "   \tPrice[" << curr_price << "] \t[" 
	      << change << movement;

  message.setContent(ticker_data.str());
  // Set TTL value so that message will timeout after a period and be purged from queues
  message.setTtl(ttl_time*Duration::SECOND);
  pub_ticker.send(message);

}

void Publisher::publish_market(const std::string symbol, unsigned short& curr_price, int i)
{
  Message message;

  // Set the routing key 
  std::cout << "MARKET Subject/Symbol:" << symbol << std::endl;
  message.setSubject(symbol);

  // Calculate the market data low/hi change, vol, market cap etc.
  if (curr_price < low_[i] || low_[i] == 0)
  {
    low_[i] = curr_price;
  } 
  else if (curr_price > high_[i] || high_[i] == 9999)
  {
    high_[i] = curr_price;
  }

  volume_[i] += rand() % 1000;  // increase the daily volume tracker
  int mkt_cap = shares_[i] * curr_price; // calculate new market cap based on current price
 
  stringstream market_data;
  // Build up the ticker info  
  market_data << "[MARKET] " << "Symbol:" << symbol << "\tVolume: " << volume_[i] 
	      << "\tHi:" << high_[i] << "\tLo:" << low_[i] << "\tMktCap:" 
	      << mkt_cap <<"M\tSEQ[" << seq << "]";

  message.setContent(market_data.str());

  message.getProperties()["qpid.LVQ_key"] = symbol;  

  pub_market.send(message);
  seq++;  // This sequence number is really just to demonstrate the LVQ nature of the queue.
          // You will notice some messages don't show because they are overwritten by last value.

}


int main(int argc, char** argv) {

    // Set up the stocks symbols and their prices
    std::string symbol[6];
    unsigned short price[6];
    symbol[0] = "NYSE.RHT";    // Red Hat
    symbol[1] = "NYSE.IBM";    // IBM Corp.
    symbol[2] = "NASDAQ.MSFT"; // Microsoft
    symbol[3] = "NASDAQ.CSCO"; // Cisco Systems
    symbol[4] = "NASDAQ.YHOO"; // Yahoo
    symbol[5] = "NASDAQ.GOOG"; // Google

    // Rough starting values.
    price[0] = rand() % 30 +1;
    price[1] = rand() % 120 +1;
    price[2] = rand() % 20 +1;
    price[3] = rand() % 75 +1;
    price[4] = rand() % 10 +1;
    price[5] = rand() % 323 +1;

    // Shares oustanding in millions.
    unsigned long  shares[6] = {190,1340,8890, 5860, 1390, 314}; 

    
    Options options;
    if (options.parse(argc, argv)) {
      std::cout << "URL[" << options.url << "] Options[" << options.connectionOptions << "]" << std::endl;
      Connection connection(options.url, options.connectionOptions);
      try {
	connection.open();
	Session session = connection.createSession();
	//	Sender sender = session.createSender(options.address);
      

	Publisher theFeed(session,options.ttl, shares);

  //--------- Main body of program --------------------------------------------

	// Print the opening values for each symbol
	std::cout << std::endl << "Opening values:" << std::endl;
	for (int i=0; i < 6; i++)
	  {
	    std::cout << symbol[i] << ":" << price[i] << std::endl;
	  }
	
	// For the duration of the publishing cycles publish 
	// ticker and market data for each symbol
	for (int j=0; j < options.cycles; j++)
	  {
	    for (unsigned int i=0; i < 6; i++)
	      {
		// for each symbol publish the ticker and the market data
		if (options.ticker)
		{
		  theFeed.publish_ticker(symbol[i], price[i]);
		}
		if(options.market)
		{
		  theFeed.publish_market(symbol[i], price[i], i);
		}
	      }
	  }
	
	
  //-----------------------------------------------------------------------------

	session.sync();
        connection.close();
        return 0;
      } catch(const std::exception& error) {
        std::cout << error.what() << std::endl;
      }
    }
    return 1;
}


/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

/*
 *  feed_listener.cpp:
 *
 *  This program is one of two programs a a shell script designed to 
 *  be used together. 
 *  These programs implement a publish-subscribe example
 *  using the a couple of exchanges (TICKER and MRKT). 
 *  In the example multiple listeners
 *  can subscribe to the same queues for TTL messages.  
 *  The TTL messages are all ticker price data. Messages are 
 *  browsed and therefore shared among the multiple listeners. 
 *  Messages timeout using TTL so that they don't stay in the queue 
 *  for too long and fill it up.  
 *  Local exclusive LVQ are also declared for market data.
 *
 *   setup_broker.sh 
 *
 *     Declares a couple of exchanges.
 *
 *   feed_publisher.cpp 
 *
 *      Sends messages to the "TICKER" or "MRKT" exchange, using the
 *      multipart routing keys (subjects) for ticker price and market data
 *      Ticker messages are sent using a TTL value.
 *
 *   feed_listener.cpp (this program)
 *
 *      Subscribes to non-exclusive queues in NOT_ACQUIRE mode for
 *      ticker price data and declares two LVQs for market data.
 *
 *      Multiple listeners can be run at the same time.
 *
 */

#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Session.h>

#include <cstdlib>
#include <iostream>
#include "OptionParser.h"

#define TICKER_ADDR "TICKER;{link:{x-bindings:[{key:'NYSE.#'},{key:'NASDAQ.#'},{key:'CNTL.#'}]}}"
// The Market Data queue will use LVQ
#define MARKET_ADDR "MRKT;{link:{x-declare:{arguments:{qpid.last_value_queue:1}}, x-bindings:[{key:NYSE.#},{key:NASDAQ.#},{key:'CNTL.#'}]}}"

using namespace qpid::messaging;
using namespace qpid::types;


struct Options : OptionParser
{
    std::string url;
    std::string address;
    std::string connectionOptions;
    int ticker;
    int market;

    Options()
        : OptionParser("Usage: drain [OPTIONS] [ADDRESS]", "Listen for messages from TICKER and/or MRKT"),
          url("127.0.0.1"),
          ticker(1),
          market(0)
    {
        add("broker,b", url, "url of broker to connect to");
        add("ticker,t", ticker, "listener on ticker feed [1 or 0] [default on]");
        add("market,m", market, "listener on market data feed [1 or 0] [default off]");
        add("connection-options", connectionOptions, "connection options string in the form {name1=value1, name2=value2}");
    }

    bool checkAddress()
    {
        if (!getArguments().empty()) {
            address = getArguments()[0];
	    return true;
        }
	return false;
    }
};

class Listener  {
  private:
    Session& session;
    Receiver receivers[3];
  unsigned short r_num;
  public:
    Listener(Session& session);
    virtual void listen();
    virtual void subscribe(std::string addr);
    ~Listener() {};
};


/*
 *  Listener::Listener
 *
 *  Set up the Listener session. Recievers to be set up later.
 */

Listener::Listener(Session& session) : 
  session(session)
{
}

void Listener::subscribe(std::string addr)
{
  // There is no need to hold onto the receiver as a class member. (or array of Receivers)
  // If used, a local Reciever is only a local handle. The local Receiver dtor would not
  // close the receiver. 
  // Receivers will only be closed and deleted when the session or connection that owns them closes.

  std::cout << "Subscribing to address[" << addr <<  "]" << std::endl;
  session.createReceiver(addr).setCapacity(1);
}


void Listener::listen() {
  // Receive messages

  std::cout << "Listening for messages ..." << std::endl;
  Message message = session.nextReceiver().fetch();
  std::string subject = message.getSubject();
  std::cout << "Subject[" << subject << "] Content [" << message.getContent() << "]"<< std::endl;
  while (subject != "CNTL.END")  {
    session.acknowledge(); // acknowledge message receipt
    Message message = session.nextReceiver().fetch();
    std::string subject = message.getSubject();
    std::cout << "Subject[" << subject << "] Content [" << message.getContent() << "]"<< std::endl;
  }
  std::cout << "Control message recieved to end. Ending." << std::endl;
  return;
}

int main(int argc, char** argv) {
  
  Options options;
  if (options.parse(argc, argv)) {
    Connection connection(options.url, options.connectionOptions);
    try {
      connection.open();
      Session session =  connection.createSession();
      
      //--------- Main body of program --------------------------------------------
      
      // Listener ctor will create the receiver
      Listener listener(session);

      if (options.ticker)
	listener.subscribe(TICKER_ADDR);
      if (options.market)
	listener.subscribe(MARKET_ADDR);
      // Check to see if a custom address is used.
      // FYI A custom address might be something like MRKT/NYSE.RHT or TICKER/NYSE.IBM 
      if(options.checkAddress())
	listener.subscribe(options.address);

      
      std::cout << "Starting Listener <Ctrl>-C to exit." << std::endl;
      
      // Give up control and receive messages
      listener.listen();
      
      
      //-----------------------------------------------------------------------------
      session.close();
      connection.close();
      return 0;
    } catch(const std::exception& error) {
      std::cout << error.what() << std::endl;
    }
  }
  return 1;   
}

/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
#include "OptionParser.h"
#include <qpid/types/Exception.h>
#include <algorithm>
#include <iostream>
#include <iomanip>
#include <sstream>
#include <cstdlib>

class Option
{
  public:
    Option(const std::string& name, const std::string& description);
    virtual ~Option() {}
    virtual void setValue(const std::string&) = 0;
    virtual bool isValueExpected() = 0;
    bool match(const std::string&);
    std::ostream& print(std::ostream& out);
  private:
    std::string longName;
    std::string shortName;
    std::string description;
    std::ostream& printNames(std::ostream& out);
  friend class OptionParser;
};

class StringOption : public Option
{
  public:
    StringOption(const std::string& name, const std::string& description, std::string& v) : Option(name, description), value(v) {}
    void setValue(const std::string& v) { value = v; }
    bool isValueExpected() { return true; }
  private:
    std::string& value;
};

class IntegerOption : public Option
{
  public:
    IntegerOption(const std::string& name, const std::string& description, int& v) : Option(name, description), value(v) {}
    void setValue(const std::string& v) { value = atoi(v.c_str()); }
    bool isValueExpected() { return true; }
  private:
    int& value;
};

class BooleanOption : public Option
{
  public:
    BooleanOption(const std::string& name, const std::string& description, bool& v) : Option(name, description), value(v) {}
    void setValue(const std::string&) { value = true; }
    bool isValueExpected() { return false; }
  private:
    bool& value;
};

class MultiStringOption : public Option
{
  public:
    MultiStringOption(const std::string& name, const std::string& description, std::vector<std::string>& v) : Option(name, description), value(v) {}
    void setValue(const std::string& v) { value.push_back(v); }
    bool isValueExpected() { return true; }
  private:
    std::vector<std::string>& value;
};

class OptionMatch
{
  public:
    OptionMatch(const std::string& argument);
    bool operator()(Option* option);
    bool isOption();
  private:
    std::string name;
};

class OptionsError : public qpid::types::Exception
{
  public:
    OptionsError(const std::string& message) : qpid::types::Exception(message) {}
};

Option::Option(const std::string& name, const std::string& desc) : description(desc)
{
    std::string::size_type i = name.find(",");
    if (i != std::string::npos) {
        longName = name.substr(0, i);
        if (i + 1 < name.size())
            shortName = name.substr(i+1);
    } else {
        longName = name;
    }
}

bool Option::match(const std::string& name)
{
    return name == longName || name == shortName;
}

std::ostream& Option::printNames(std::ostream& out)
{
    if (shortName.size()) {
        out << "-" << shortName;
        if (isValueExpected()) out << " VALUE";
        out << ", --" << longName;
        if (isValueExpected()) out << " VALUE";
    } else {
        out << "--" << longName;
        if (isValueExpected()) out << " VALUE";
    }
    return out;
}

std::ostream& Option::print(std::ostream& out)
{
    std::stringstream names;
    printNames(names);
    out << std::setw(30) << std::left << names.str() << description << std::endl;
    return out;
}

std::vector<std::string>& OptionParser::getArguments() { return arguments; }

void OptionParser::add(Option* option)
{
    options.push_back(option);
}

void OptionParser::add(const std::string& name, std::string& value, const std::string& description)
{
    add(new StringOption(name, description, value));
}
void OptionParser::add(const std::string& name, int& value, const std::string& description)
{
    add(new IntegerOption(name, description, value));
}
void OptionParser::add(const std::string& name, bool& value, const std::string& description)
{
    add(new BooleanOption(name, description, value));
}
void OptionParser::add(const std::string& name, std::vector<std::string>& value, const std::string& description)
{
    add(new MultiStringOption(name, description, value));
}

OptionMatch::OptionMatch(const std::string& argument)
{
    if (argument.find("--") == 0) {
        name = argument.substr(2);
    } else if (argument.find("-") == 0) {
        name = argument.substr(1);
    }
}

bool OptionMatch::operator()(Option* option)
{
    return option->match(name);
}

bool OptionMatch::isOption()
{
    return name.size() > 0;
}

OptionParser::OptionParser(const std::string& s, const std::string& d) : summary(s), description(d), help(false)
{
    add("help,h", help, "show this message");
}

Option* OptionParser::getOption(const std::string& argument)
{
    OptionMatch match(argument);
    if (match.isOption()) {
        Options::iterator i = std::find_if(options.begin(), options.end(), match);
        if (i == options.end()) {
            std::stringstream error;
            error << "Unrecognised option: " << argument;
            throw OptionsError(error.str());
        } else {
            return *i;
        }        
    } else {
        return 0;
    }
}

void OptionParser::error(const std::string& message)
{
    std::cout << summary << std::endl << std::endl;
    std::cerr << "Error: " << message << "; try --help for more information" << std::endl;
}

bool OptionParser::parse(int argc, char** argv)
{
    try {
        for (int i = 1; i < argc; ++i) {
            std::string argument = argv[i];
            Option* o = getOption(argument);
            if (o) {
                if (o->isValueExpected()) {
                    if (i + 1 < argc) {
                        o->setValue(argv[++i]);
                    } else {
                        std::stringstream error;
                        error << "Value expected for option " << o->longName;
                        throw OptionsError(error.str());
                    }
                } else {
                    o->setValue("");
                }
            } else {
                arguments.push_back(argument);
            }
        }
        if (help) {
            std::cout << summary << std::endl << std::endl;
            std::cout << description << std::endl << std::endl;
            std::cout << "Options: " << std::endl;
            for (Options::iterator i = options.begin(); i != options.end(); ++i) {
                (*i)->print(std::cout);
            }
            return false;
        } else {
            return true;
        }
    } catch (const std::exception& e) {
        error(e.what());
        return false;
    }
}


OptionParser::~OptionParser()
{
    for (Options::iterator i = options.begin(); i != options.end(); ++i) {        
        delete *i;
    }
}
#ifndef OPTIONPARSER_H
#define OPTIONPARSER_H

/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#include <map>
#include <string>
#include <vector>

class Option;

class OptionParser
{
  public:
    OptionParser(const std::string& usage, const std::string& description);
    ~OptionParser();
    void add(const std::string& name, std::string& value, const std::string& description = std::string());
    void add(const std::string& name, int& value, const std::string& description = std::string());
    void add(const std::string& name, bool& value, const std::string& description = std::string());
    void add(const std::string& name, std::vector<std::string>& value, const std::string& description = std::string());
    bool parse(int argc, char** argv);
    void error(const std::string& message);
    std::vector<std::string>& getArguments();
  private:
    typedef std::vector<Option*> Options;

    const std::string summary;
    const std::string description;
    bool help;
    Options options;
    std::vector<std::string> arguments;

    void add(Option*);
    Option* getOption(const std::string& argument);
};

#endif  /*!OPTIONPARSER_H*/

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to