#!/usr/bin/python """Demonstration program for termios VMIN, VTIME
Usage: titest [-n] [-s] [-t] [-0] [-r #] [-w #] -n: Set non-blocking mode in responder Default: off -s: Use select before read in responder Default: off -t: Give a reply that is one byte shorter than requested Default: off -l: Give a reply that is one byte longer than requested Default: off -0: Give a zero-byte reply Default: off -r #: Set VTIME to (r+99)/100 (i.e. # is VTIME in milliseconds) Default: 300 -w #: Set writer inter-character time to # milliseconds Default: 200 -z #: Set request size in bytes Default: 4 Theory: The program allocates a pseudoterminal and then starts a 'responder' thread. The responder thread reads one byte; the value of that byte is the length of the response (possibly affected by the -t, -l or -0 flags). The responder writes one character at a time, with a delay after each character (-w). When it receives a '\0' request, it exits. The main thread invokes the 'requester'. The requester sets non-canonical mode, VMIN according to the request size (-z), and VTIME according to the -r argument. Then it optionally sets non-blocking mode (-n), optionally selects (-s), and then reads. After the read (or failing select) completes, it prints the result. (Here, 'master' and 'slave' refer to the two ends of the pseudoterminal. 'slave' behaves like a tty device but actually communicates with the file descriptor 'master'. This is not related to the modbus concept of master and slave devices) Results: In non-blocking mode, at most one byte is ever read. Without select, a zero-length reply causes the requester to block forever. (may require killing the test program with a signal) When the requester wait is not longer than the responder delay, a short read (almost always 1 byte) results. When the requester wait is longer than the responder delay, select is enabled, and non-blocking mode is not reqested, then all three cases (no reply, short reply, desired reply) work. A "too long" response is not detected and extra bytes from an earlier over-length request will be the first bytes presented at a subsequent read. (All tests were performed on Ubuntu 10.04 with stock kernel) Relation to modbus master: Termios VTIME and VMIN plus select are an adequate mechanism for receiving modbus replies. A non-responsive slave, an error reply, and a correct reply can all be received without looping on read or read+select. Non-blocking mode is harmful and should not be used; select ensures the program won't block forever. """ import fcntl import getopt import os import pty import select import sys import termios import threading import time do_nonblock = do_select = do_short = do_noreply = do_short = do_long = False request_count = 4 responder_delay = 200 requester_wait = 300 opts, args = getopt.getopt(sys.argv[1:], 'nstl0r:w:z:') for k, v in opts: if k == '-n': do_nonblock = not do_nonblock if k == '-s': do_select = not do_select if k == '-t': do_short = not do_short if k == '-l': do_long = not do_long if k == '-0': do_noreply = not do_noreply if k == '-w': responder_delay = int(v) if k == '-r': requester_wait = int(v) if k == '-z': request_count = int(v) master, slave = pty.openpty() class ResponderThread(threading.Thread): def __init__(self): self.quit = False threading.Thread.__init__(self) def run(self): while 1: c = os.read(master, 1) o = ord(c) if self.quit or not o: return if do_noreply: continue if do_short: o -= 1 if do_long: o += 1 for i in range(o): if self.quit: return os.write(master, "A") time.sleep(responder_delay / 1000.) def requester(): attr = termios.tcgetattr(slave) attr[6][termios.VMIN] = request_count attr[6][termios.VTIME] = (requester_wait+99) / 100 attr[3] &= ~(termios.ICANON | termios.ECHO) termios.tcsetattr(slave, termios.TCSANOW, attr) if do_nonblock: fcntl.fcntl(slave, fcntl.F_SETFL, fcntl.fcntl(slave, fcntl.F_GETFL) | os.O_NONBLOCK) os.write(slave, chr(request_count)) if do_select: r,w,x = select.select([slave], [], [], requester_wait / 1000.) if not r: print "Wanted %d, select failed" % (request_count) os.write(slave, chr(0)) return r = len(os.read(slave, request_count)) print "Wanted %d, got %d" % (request_count, r) os.write(slave, chr(0)) w = ResponderThread() w.start() try: requester() finally: w.quit = True w.join() # Copyright (C) 2010 Jeff Epler # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ------------------------------------------------------------------------------ ThinkGeek and WIRED's GeekDad team up for the Ultimate GeekDad Father's Day Giveaway. ONE MASSIVE PRIZE to the lucky parental unit. See the prize list and enter to win: http://p.sf.net/sfu/thinkgeek-promo _______________________________________________ Emc-developers mailing list Emc-developers@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/emc-developers