#!/usr/bin/python
"""Demonstration program for termios VMIN, VTIME

Usage:
    titest [-n] [-s] [-t] [-0] [-r #] [-w #]

  -n: Set non-blocking mode in responder                        Default: off
  -s: Use select before read in responder                       Default: off
  -t: Give a reply that is one byte shorter than requested      Default: off
  -l: Give a reply that is one byte longer than requested       Default: off
  -0: Give a zero-byte reply                                    Default: off
-r #: Set VTIME to (r+99)/100 (i.e. # is VTIME in milliseconds) Default: 300
-w #: Set writer inter-character time to # milliseconds         Default: 200
-z #: Set request size in bytes                                 Default: 4

Theory:
The program allocates a pseudoterminal and then starts a 'responder' thread.
The responder thread reads one byte; the value of that byte is the length
of the response (possibly affected by the -t, -l or -0 flags).

The responder writes one character at a time, with a delay after each
character (-w).  When it receives a '\0' request, it exits.

The main thread invokes the 'requester'.  The requester sets non-canonical mode,
VMIN according to the request size (-z), and VTIME according to the -r
argument.  Then it optionally sets non-blocking mode (-n), optionally selects
(-s), and then reads.  After the read (or failing select) completes, it prints
the result.

(Here, 'master' and 'slave' refer to the two ends of the pseudoterminal.
'slave' behaves like a tty device but actually communicates with the file
descriptor 'master'.  This is not related to the modbus concept of master
and slave devices)

Results:
In non-blocking mode, at most one byte is ever read.

Without select, a zero-length reply causes the requester to block forever.
(may require killing the test program with a signal)

When the requester wait is not longer than the responder delay, a short
read (almost always 1 byte) results.

When the requester wait is longer than the responder delay, select is enabled,
and non-blocking mode is not reqested, then all three cases (no reply,
short reply, desired reply) work.

A "too long" response is not detected and extra bytes from an earlier
over-length request will be the first bytes presented at a subsequent
read.

(All tests were performed on Ubuntu 10.04 with stock kernel)

Relation to modbus master:
Termios VTIME and VMIN plus select are an adequate mechanism for
receiving modbus replies.  A non-responsive slave, an error reply, and a
correct reply can all be received without looping on read or
read+select.  Non-blocking mode is harmful and should not be used;
select ensures the program won't block forever.
"""

import fcntl
import getopt
import os
import pty
import select
import sys
import termios
import threading
import time

do_nonblock = do_select = do_short = do_noreply = do_short = do_long = False
request_count = 4
responder_delay = 200
requester_wait = 300

opts, args = getopt.getopt(sys.argv[1:], 'nstl0r:w:z:')
for k, v in opts:
    if k == '-n': do_nonblock = not do_nonblock
    if k == '-s': do_select = not do_select
    if k == '-t': do_short = not do_short
    if k == '-l': do_long = not do_long
    if k == '-0': do_noreply = not do_noreply
    if k == '-w': responder_delay = int(v)
    if k == '-r': requester_wait = int(v)
    if k == '-z': request_count = int(v)

master, slave = pty.openpty()


class ResponderThread(threading.Thread):
    def __init__(self):
        self.quit = False
        threading.Thread.__init__(self)
    def run(self):
        while 1:
            c = os.read(master, 1)
            o = ord(c)
            if self.quit or not o: return
            if do_noreply: continue
            if do_short: o -= 1
            if do_long: o += 1
            for i in range(o):
                if self.quit: return
                os.write(master, "A")
                time.sleep(responder_delay / 1000.)

def requester():
    attr = termios.tcgetattr(slave)
    attr[6][termios.VMIN] = request_count
    attr[6][termios.VTIME] = (requester_wait+99) / 100
    attr[3] &= ~(termios.ICANON | termios.ECHO)
    termios.tcsetattr(slave, termios.TCSANOW, attr)

    if do_nonblock:
        fcntl.fcntl(slave, fcntl.F_SETFL,
            fcntl.fcntl(slave, fcntl.F_GETFL) | os.O_NONBLOCK)

    os.write(slave, chr(request_count))
    if do_select:
        r,w,x = select.select([slave], [], [], requester_wait / 1000.)
        if not r:
            print "Wanted %d, select failed" % (request_count)
            os.write(slave, chr(0))
            return
    r = len(os.read(slave, request_count))
    print "Wanted %d, got %d" % (request_count, r)
    os.write(slave, chr(0))

w = ResponderThread()
w.start()
try:
    requester()
finally:
    w.quit = True
w.join()

#    Copyright (C) 2010 Jeff Epler
#
#    This program is free software; you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation; either version 2 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program; if not, write to the Free Software
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

------------------------------------------------------------------------------
ThinkGeek and WIRED's GeekDad team up for the Ultimate 
GeekDad Father's Day Giveaway. ONE MASSIVE PRIZE to the 
lucky parental unit.  See the prize list and enter to win: 
http://p.sf.net/sfu/thinkgeek-promo
_______________________________________________
Emc-developers mailing list
Emc-developers@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/emc-developers

Reply via email to