Hi,
For several months I worked with Telepathy and Farsight frameworks to
create a videoconferencing application.
For this, I maintained an autonomous testing program to make data
exchange between two contacts with Telepathy.
I would be happy if this could help someone.
Regards,
--
Fabien LOUIS <flo...@viotech.net>
"""
Example to create a videoconferencing application using Farsight throught
Telepathy
@author: Fabien LOUIS, flo...@viotech.net
@date: December 2009
Copyright (C) 2009 Viotech Communications
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the
Free Software Foundation, Inc., 59 Temple Place - Suite 330,
Boston, MA 02111-1307, USA.
@Usage:
Just run two clients, one with "slave" parameter and seconds with "master"
parameter (master account initiate the call or slave accept the call).
@Scenario:
We start by connect each accounts (first the slave and then the master).
(see function on_status_changed_user)
When accounts are connected, we enable CHANNEL_TYPE_STREAMED_MEDIA capabilities
(see function function enable_capabilities)
on it and MASTER create a telepathy account.
(see function function create_telepathy_channel)
When the channel CHANNEL_TYPE_STREAMED_MEDIA is created and ready
(received by each contacts), we create an TfListener. But only the MASTER
offer a stream on it for SLAVE. SLAVE look if some streams already exists
on the telepathy channel and in this case, we accept the stream.
TfListener is a component which encapsulate farsight management (session
creation, stream creation, src pad adding, etc.).
We create a tfChannel linked with the telepathy channel path and connect it
with some farsight signals.
Once the stream is created and accepted, the farsight part started.
First, we get 'session-created' signal which call __on_session_created
On signal "session-created", we create the pipeline and add conference
on it. We set pipeline to PLAYING and we transfer all bus message to it.
Second, we get 'stream-get-codec-config' signal which call __on_stream_get_codec_config
On signal "stream-get-codec-config", we returns our codec configuration
Third, we get 'stream-created' which call __on_stream_created
On signal "stream-created", connect signals on stream and add source.
Then we link the stream's sink-pad to source's src-pad.
If all works (codec negotiation and other things), we get 'src-pad-added' which
call __on_src_pad_added.
On signal "src-pad-added", we display stream's view
"""
##################################################
MASTER_ACCOUNT = ""
MASTER_PASSWORD = ""
SLAVE_ACCOUNT = ""
SLAVE_PASSWORD = ""
##################################################
import pygst
pygst.require('0.10')
import sys, os
import gobject, dbus.glib
import tpfarsight
import farsight, gst
os.environ["GST_PLUGIN_PATH"] = "/usr/local/lib/gstreamer-0.10"
##
## Debugging
##
#os.environ["GST_DEBUG"] = "fsrtp*:5"
#os.environ["GST_DEBUG"] = "1,fs*:2,rtp*:1"
#os.environ["GST_DEBUG"] = "*PAD*:5,*bin*:5"
import telepathy
from telepathy.interfaces import (
CONNECTION,
CONNECTION_MANAGER,
CONNECTION_INTERFACE_CAPABILITIES,
CHANNEL_TYPE_STREAMED_MEDIA,
CONNECTION_INTERFACE_REQUESTS,
CHANNEL_INTERFACE_GROUP,
)
from telepathy.constants import (
CONNECTION_STATUS_CONNECTED,
HANDLE_TYPE_CONTACT,
MEDIA_STREAM_TYPE_AUDIO,
MEDIA_STREAM_TYPE_VIDEO,
)
##################################################
def debug_callback(self, *args, **kwargs):
"""
Debug function for unused signal
"""
print "debug_callback"
# Print all kwarg
for arg in args:
print "\t[arg] %s" % str(arg)
# Print all kwarg
for kwarg in kwargs:
print "\t[kwarg]%s: %s" % (kwarg, kwargs[kwarg])
def on_status_changed_user(state, reason):
"""
When a user status changed
"""
if state == CONNECTION_STATUS_CONNECTED:
print "Connected"
gobject.timeout_add(2000, enable_capabilities)
if mode == "master":
print "Creating channel"
gobject.timeout_add(5000, create_telepathy_channel)
else:
print "Waiting streams..."
def enable_capabilities():
"""
Enable CHANNEL_TYPE_STREAMED_MEDIA capabilities
"""
conn[CONNECTION_INTERFACE_CAPABILITIES].AdvertiseCapabilities(
[(CHANNEL_TYPE_STREAMED_MEDIA, 15)], [])
def create_telepathy_channel():
"""
Request the handle for SLAVE and create a telepathy channel between contacts
"""
# Request contact handle
contact_handle = conn[CONNECTION].RequestHandles(HANDLE_TYPE_CONTACT,
[SLAVE_ACCOUNT])[0]
# Create CHANNEL_TYPE_STREAMED_MEDIA with handle
conn[CONNECTION_INTERFACE_REQUESTS].CreateChannel({
"org.freedesktop.Telepathy.Channel.ChannelType":
CHANNEL_TYPE_STREAMED_MEDIA,
"org.freedesktop.Telepathy.Channel.TargetHandleType":
HANDLE_TYPE_CONTACT,
"org.freedesktop.Telepathy.Channel.TargetHandle":
contact_handle})
def on_new_channel_user(object_path, channel_type, handle_type,
handle, suppress_handler):
"""
When the channel CHANNEL_TYPE_STREAMED_MEDIA is created and ready
(received by each contacts), we create an TfListener. But only the MASTER
offer a stream on it for SLAVE. SLAVE look if some streams already exists
on the telepathy channel and in this case, we accept the stream.
"""
if channel_type == CHANNEL_TYPE_STREAMED_MEDIA:
print "New %s channel" % (channel_type)
# Get telepathy channel
channel = telepathy.client.channel.Channel(conn.service_name,
object_path,
conn.bus)
# Create a tf listener
TfListener(conn, object_path)
if mode == "master": # Request a stream
# Get contact handle
contact_handle = conn[CONNECTION].RequestHandles(
HANDLE_TYPE_CONTACT,
[SLAVE_ACCOUNT])[0]
# Request stream
channel[CHANNEL_TYPE_STREAMED_MEDIA].RequestStreams(
contact_handle,
[MEDIA_STREAM_TYPE_VIDEO
# ,MEDIA_STREAM_TYPE_AUDIO
])
else:
# Regarding if streams are already in, and accepted them
if channel[CHANNEL_TYPE_STREAMED_MEDIA].ListStreams() != []:
# Accept the stream by adding owner
channel[CHANNEL_INTERFACE_GROUP].AddMembers(
[conn[CONNECTION].GetSelfHandle()],
"")
##################################################
class TfListener(object):
"""
TfListener is a component which encapsulate farsight management (session
creation, stream creation, src pad adding, etc.).
We create a tfChannel linked with the telepathy channel path and connect it
with some farsight signals.
"""
def __init__(self, connection, chan_object_path):
"""
Init
@param connection: current connection
@type connection: Telepathy Connection
@param chan_object_path: Object path of the StreamedMedia channel
@type chan_object_path: String
"""
super(TfListener, self).__init__()
self.pipeline = None
self.conn = connection
# We create a tfChannel linked with the telepathy channel path.
self.tf_channel = tpfarsight.Channel(
connection_busname=self.conn.service_name,
connection_path=self.conn.object_path,
channel_path=chan_object_path)
# Connect to several signals
print "connecting to channel", self.tf_channel
self.tf_channel.connect('session-created', self.__on_session_created)
self.tf_channel.connect('stream-created', self.__on_stream_created)
self.tf_channel.connect('stream-get-codec-config',
self.__on_stream_get_codec_config)
def __on_session_created(self, channel, conference, participant):
"""
On signal "session-created", we create the pipeline and add conference
on it. We set pipeline to PLAYING and we transfer all bus message to it.
"""
print
print "=== %s __on_session_created ===" % self
print
self.pipeline = gst.Pipeline()
self.pipeline.add(conference)
self.pipeline.set_state(gst.STATE_PLAYING)
# Transfer all bus message
self.pipeline.get_bus().add_watch(self.__async_handler)
def __on_stream_get_codec_config(self, channel, stream_id, media_type,
direction):
"""
On signal "stream-get-codec-config", we returns our codec configuration
"""
print
print "=== %s __on_stream_get_codec_config ===" % self
print
if media_type == farsight.MEDIA_TYPE_VIDEO:
codecs = [farsight.Codec(farsight.CODEC_ID_ANY, "H264",
farsight.MEDIA_TYPE_VIDEO, 0)]
if self.conn.GetProtocol() == "sip" :
codecs += [ farsight.Codec(farsight.CODEC_ID_DISABLE, "THEORA",
farsight.MEDIA_TYPE_VIDEO, 0) ]
else:
codecs += [ farsight.Codec(farsight.CODEC_ID_ANY, "THEORA",
farsight.MEDIA_TYPE_VIDEO, 0) ]
codecs += [
farsight.Codec(farsight.CODEC_ID_ANY, "H263",
farsight.MEDIA_TYPE_VIDEO, 0),
farsight.Codec(farsight.CODEC_ID_ANY, "JPEG",
farsight.MEDIA_TYPE_VIDEO, 0),
farsight.Codec(farsight.CODEC_ID_ANY, "MPV",
farsight.MEDIA_TYPE_VIDEO, 0),
farsight.Codec(farsight.CODEC_ID_DISABLE, "DV",
farsight.MEDIA_TYPE_VIDEO, 0),
]
return codecs
else:
return None
def __on_stream_created(self, channel, stream):
"""
On signal "stream-created">, connect signals on stream and add source.
Then we link the stream's sink-pad to source's src-pad.
"""
print
print "=== %s __on_stream_created ===" % self
print
stream.connect('src-pad-added', self.__on_src_pad_added)
stream.connect('closed', debug_callback, "closed") # Not used
stream.connect('error', debug_callback, "error") # Not used
stream.connect('free-resource', debug_callback, "free") # Not used
# creating src pipes
type = stream.get_property ("media-type")
if type == farsight.MEDIA_TYPE_AUDIO:
source = gst.parse_bin_from_description (
"audiotestsrc", True)
elif type == farsight.MEDIA_TYPE_VIDEO:
if mode == "master":
source = gst.parse_bin_from_description (
# "v4l2src device='/dev/video0'", True)
"videotestsrc", True)
else:
source = gst.parse_bin_from_description (
# "v4l2src device='/dev/video1'", True)
"videotestsrc", True)
self.pipeline.add(source)
source.get_pad("src").link(stream.get_property("sink-pad"))
source.set_state(gst.STATE_PLAYING)
def __on_src_pad_added (self, stream, pad, codec):
"""
On signal "src-pad-added", we display stream view
"""
print
print "=== %s __src_pad_added ===" % self
print
type = stream.get_property ("media-type")
if type == farsight.MEDIA_TYPE_AUDIO:
queue_sink = gst.parse_bin_from_description("queue ! audioconvert ! audioresample ! audioconvert ! autoaudiosink", True)
audioadder = gst.element_factory_make("liveadder")
tee = gst.element_factory_make("tee")
# Add & Link
self.pipeline.add(audioadder, tee, queue_sink)
gst.element_link_many(audioadder, tee, queue_sink)
pad.link(audioadder.get_pad("sink%d"))
queue_sink.set_state(gst.STATE_PLAYING)
tee.set_state(gst.STATE_PLAYING)
audioadder.set_state(gst.STATE_PLAYING)
elif type == farsight.MEDIA_TYPE_VIDEO:
queue_ff = gst.parse_bin_from_description("queue ! ffmpegcolorspace", True)
if USE_CLUTTER_TO_DISPLAY:
sink = cluttergst.VideoSink(video_texture)
else:
sink = gst.parse_bin_from_description("videoscale ! video/x-raw-yuv,width=320,height=240 ! autovideosink", True)
videofunnel = gst.element_factory_make("fsfunnel")
tee = gst.element_factory_make("tee")
# Add & Link
self.pipeline.add(videofunnel, tee, queue_ff, sink)
gst.element_link_many(videofunnel, tee, queue_ff, sink)
pad.link(videofunnel.get_pad("sink%d"))
sink.set_state(gst.STATE_PLAYING)
queue_ff.set_state(gst.STATE_PLAYING)
tee.set_state(gst.STATE_PLAYING)
videofunnel.set_state(gst.STATE_PLAYING)
self.pipeline.set_state(gst.STATE_PLAYING)
def __async_handler (self, bus, message):
"""
Check all bus message and redirect them to tf_channel (obligatory)
"""
if self.tf_channel != None:
self.tf_channel.bus_message(message)
return True
##################################################
##################################################
if __name__ == '__main__':
# Manage parameters
if len(sys.argv) < 2:
print "Usage: %s {master|slave} [--enable clutter]" % sys.argv[0]
exit(1)
if len(sys.argv) == 4 and sys.argv[2] == "--enable" and sys.argv[3] == "clutter":
USE_CLUTTER_TO_DISPLAY = True
else:
USE_CLUTTER_TO_DISPLAY = False
if USE_CLUTTER_TO_DISPLAY:
import clutter
import cluttergst
clutter.threads_init()
clutter.init()
##
## Mode
##
mode = sys.argv[1]
print "=== %s ===" % sys.argv[0]
print "Mode: %s" % mode
# Enable parameters due to mode
if mode == "master":
parameters = {"account": MASTER_ACCOUNT, "password": MASTER_PASSWORD,
"port": dbus.UInt32(5222), "server": MASTER_ACCOUNT.split("@")[-1]}
else:
parameters = {"account": SLAVE_ACCOUNT, "password": SLAVE_PASSWORD,
"port": dbus.UInt32(5222), "server": SLAVE_ACCOUNT.split("@")[-1]}
##
## Clutter
##
if USE_CLUTTER_TO_DISPLAY:
stage = clutter.Stage(default=True)
stage.set_size(320, 240)
stage.set_color(clutter.color_from_string('DarkSlateGrey'))
video_texture = clutter.Texture()
video_texture.show()
stage.add(video_texture)
##
## Telepathy
##
# Get the requested manager
reg = telepathy.client.ManagerRegistry()
reg.LoadManagers()
manager = reg.GetManager("gabble")
# Request connection
conn_bus_name, conn_object_path = manager[CONNECTION_MANAGER].\
RequestConnection("jabber", parameters)
conn = telepathy.client.Connection(conn_bus_name, conn_object_path)
# Listen signals
conn[CONNECTION].connect_to_signal("NewChannel", on_new_channel_user)
conn[CONNECTION].connect_to_signal("StatusChanged", on_status_changed_user)
# Connect
conn[CONNECTION].Connect()
##
## Test
##
if USE_CLUTTER_TO_DISPLAY:
stage.show_all()
clutter.main()
else:
loop = gobject.MainLoop()
try:
loop.run()
except KeyboardInterrupt:
print 'interrupted'
------------------------------------------------------------------------------
Let Crystal Reports handle the reporting - Free Crystal Reports 2008 30-Day
trial. Simplify your report design, integration and deployment - and focus on
what you do best, core application coding. Discover what's new with
Crystal Reports now. http://p.sf.net/sfu/bobj-july
_______________________________________________
Farsight-devel mailing list
Farsight-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/farsight-devel