Hi,

For several months I worked with Telepathy and Farsight frameworks to
create a videoconferencing application.

For this, I maintained an autonomous testing program to make data
exchange between two contacts with Telepathy.

I would be happy if this could help someone.

Regards,
-- 
Fabien LOUIS <flo...@viotech.net>
"""
Example to create a videoconferencing application using Farsight throught
Telepathy

@author: Fabien LOUIS, flo...@viotech.net
@date: December 2009

Copyright (C) 2009 Viotech Communications

This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.

This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the
Free Software Foundation, Inc., 59 Temple Place - Suite 330,
Boston, MA 02111-1307, USA.

@Usage:
Just run two clients, one with "slave" parameter and seconds with "master"
parameter (master account initiate the call or slave accept the call).

@Scenario:
We start by connect each accounts (first the slave and then the master).
    (see function on_status_changed_user)
When accounts are connected, we enable CHANNEL_TYPE_STREAMED_MEDIA capabilities
    (see function function enable_capabilities)
on it and MASTER create a telepathy account.
    (see function function create_telepathy_channel)

When the channel CHANNEL_TYPE_STREAMED_MEDIA is created and ready
(received by each contacts), we create an TfListener. But only the MASTER
offer a stream on it for SLAVE. SLAVE look if some streams already exists
on the telepathy channel and in this case, we accept the stream.

TfListener is a component which encapsulate farsight management (session
creation, stream creation, src pad adding, etc.).

We create a tfChannel linked with the telepathy channel path and connect it
with some farsight signals.

Once the stream is created and accepted, the farsight part started.
First, we get 'session-created' signal which call __on_session_created
    On signal "session-created", we create the pipeline and add conference
    on it. We set pipeline to PLAYING and we transfer all bus message to it.
    
Second, we get 'stream-get-codec-config' signal which call __on_stream_get_codec_config
    On signal "stream-get-codec-config", we returns our codec configuration

Third, we get 'stream-created' which call __on_stream_created
    On signal "stream-created", connect signals on stream and add source.
    Then we link the stream's sink-pad  to source's src-pad.

If all works (codec negotiation and other things), we get 'src-pad-added' which
call __on_src_pad_added.
    On signal "src-pad-added", we display stream's view
"""

##################################################
MASTER_ACCOUNT = ""
MASTER_PASSWORD = ""

SLAVE_ACCOUNT = ""
SLAVE_PASSWORD = ""
##################################################

import pygst
pygst.require('0.10')

import sys, os
import gobject, dbus.glib
import tpfarsight
import farsight, gst

os.environ["GST_PLUGIN_PATH"] = "/usr/local/lib/gstreamer-0.10"

##
## Debugging
##
#os.environ["GST_DEBUG"] = "fsrtp*:5"
#os.environ["GST_DEBUG"] = "1,fs*:2,rtp*:1"
#os.environ["GST_DEBUG"] = "*PAD*:5,*bin*:5"

import telepathy
from telepathy.interfaces import (
    CONNECTION,
    CONNECTION_MANAGER,
    CONNECTION_INTERFACE_CAPABILITIES,
    CHANNEL_TYPE_STREAMED_MEDIA,
    CONNECTION_INTERFACE_REQUESTS,
    CHANNEL_INTERFACE_GROUP,
)
from telepathy.constants import (
    CONNECTION_STATUS_CONNECTED,
    HANDLE_TYPE_CONTACT,
    MEDIA_STREAM_TYPE_AUDIO,
    MEDIA_STREAM_TYPE_VIDEO,
)

##################################################
def debug_callback(self, *args, **kwargs):
    """
    Debug function for unused signal    
    """
    
    print "debug_callback"
    # Print all kwarg
    for arg in args:
        print "\t[arg] %s" % str(arg)       
    # Print all kwarg
    for kwarg in kwargs:
        print "\t[kwarg]%s: %s" % (kwarg, kwargs[kwarg])
        
def on_status_changed_user(state, reason):
    """
    When a user status changed
    """

    if state == CONNECTION_STATUS_CONNECTED:
        print "Connected"
        gobject.timeout_add(2000, enable_capabilities)
        if mode == "master":
            print "Creating channel"
            gobject.timeout_add(5000, create_telepathy_channel)
        else:
            print "Waiting streams..."

def enable_capabilities():
    """
    Enable CHANNEL_TYPE_STREAMED_MEDIA capabilities
    """
    
    conn[CONNECTION_INTERFACE_CAPABILITIES].AdvertiseCapabilities(
                            [(CHANNEL_TYPE_STREAMED_MEDIA, 15)], [])

def create_telepathy_channel():
    """
    Request the handle for SLAVE and create a telepathy channel between contacts
    """
    
    # Request contact handle
    contact_handle = conn[CONNECTION].RequestHandles(HANDLE_TYPE_CONTACT,
                                                     [SLAVE_ACCOUNT])[0]    
    # Create CHANNEL_TYPE_STREAMED_MEDIA with handle
    conn[CONNECTION_INTERFACE_REQUESTS].CreateChannel({
                "org.freedesktop.Telepathy.Channel.ChannelType":
                    CHANNEL_TYPE_STREAMED_MEDIA,
                "org.freedesktop.Telepathy.Channel.TargetHandleType":
                    HANDLE_TYPE_CONTACT,
                "org.freedesktop.Telepathy.Channel.TargetHandle":
                    contact_handle})
            
def on_new_channel_user(object_path, channel_type, handle_type,
                         handle, suppress_handler):
    """
    When the channel CHANNEL_TYPE_STREAMED_MEDIA is created and ready
    (received by each contacts), we create an TfListener. But only the MASTER
    offer a stream on it for SLAVE. SLAVE look if some streams already exists
    on the telepathy channel and in this case, we accept the stream.
    """
    
    if channel_type == CHANNEL_TYPE_STREAMED_MEDIA:
        print "New %s channel" % (channel_type)
        
        # Get telepathy channel
        channel = telepathy.client.channel.Channel(conn.service_name,
                                                   object_path,
                                                   conn.bus)

        # Create a tf listener
        TfListener(conn, object_path)
        
        if mode == "master":    # Request a stream
            # Get contact handle
            contact_handle = conn[CONNECTION].RequestHandles(
                                                            HANDLE_TYPE_CONTACT,
                                                            [SLAVE_ACCOUNT])[0]
            # Request stream
            channel[CHANNEL_TYPE_STREAMED_MEDIA].RequestStreams(
                                                contact_handle,
                                                [MEDIA_STREAM_TYPE_VIDEO
#                                                ,MEDIA_STREAM_TYPE_AUDIO
                                                ])
        else:
            # Regarding if streams are already in, and accepted them
            if channel[CHANNEL_TYPE_STREAMED_MEDIA].ListStreams() != []:
                # Accept the stream by adding  owner
                channel[CHANNEL_INTERFACE_GROUP].AddMembers(
                                        [conn[CONNECTION].GetSelfHandle()],
                                        "")

##################################################        
class TfListener(object):
    """
    TfListener is a component which encapsulate farsight management (session
    creation, stream creation, src pad adding, etc.).
    
    We create a tfChannel linked with the telepathy channel path and connect it
    with some farsight signals.
    """
    
    def __init__(self, connection, chan_object_path):
        """
        Init
        
        @param connection: current connection
        @type connection: Telepathy Connection
        
        @param chan_object_path: Object path of the StreamedMedia channel
        @type chan_object_path: String
        """
        
        super(TfListener, self).__init__()        
        self.pipeline = None
        self.conn = connection
        
        # We create a tfChannel linked with the telepathy channel path.
        self.tf_channel = tpfarsight.Channel(
                                     connection_busname=self.conn.service_name,
                                     connection_path=self.conn.object_path,
                                     channel_path=chan_object_path)
        # Connect to several signals
        print "connecting to channel", self.tf_channel
        self.tf_channel.connect('session-created', self.__on_session_created)
        self.tf_channel.connect('stream-created', self.__on_stream_created)
        self.tf_channel.connect('stream-get-codec-config',
                                self.__on_stream_get_codec_config)

    def __on_session_created(self, channel, conference, participant):
        """
        On signal "session-created", we create the pipeline and add conference
        on it. We set pipeline to PLAYING and we transfer all bus message to it.
        """
        
        print
        print "=== %s __on_session_created ===" % self
        print
       
        self.pipeline = gst.Pipeline()        
        self.pipeline.add(conference)
        self.pipeline.set_state(gst.STATE_PLAYING)
        
        # Transfer all bus message
        self.pipeline.get_bus().add_watch(self.__async_handler)

    def __on_stream_get_codec_config(self, channel, stream_id, media_type,
                                     direction):
        """
        On signal "stream-get-codec-config", we returns our codec configuration
        """
        
        print
        print "=== %s __on_stream_get_codec_config ===" % self
        print
        
        if media_type == farsight.MEDIA_TYPE_VIDEO:
            codecs = [farsight.Codec(farsight.CODEC_ID_ANY, "H264",
                                     farsight.MEDIA_TYPE_VIDEO, 0)]
            
            if self.conn.GetProtocol() == "sip" :
                codecs += [ farsight.Codec(farsight.CODEC_ID_DISABLE, "THEORA",
                                        farsight.MEDIA_TYPE_VIDEO, 0) ]
            else:
                codecs += [ farsight.Codec(farsight.CODEC_ID_ANY, "THEORA",
                                        farsight.MEDIA_TYPE_VIDEO, 0) ]
            
            codecs += [
                farsight.Codec(farsight.CODEC_ID_ANY, "H263",
                                        farsight.MEDIA_TYPE_VIDEO, 0),
                farsight.Codec(farsight.CODEC_ID_ANY, "JPEG",
                                        farsight.MEDIA_TYPE_VIDEO, 0),
                farsight.Codec(farsight.CODEC_ID_ANY, "MPV",
                                        farsight.MEDIA_TYPE_VIDEO, 0),
                farsight.Codec(farsight.CODEC_ID_DISABLE, "DV",
                                        farsight.MEDIA_TYPE_VIDEO, 0),
            ]

            return codecs
        else:
            return None

    def __on_stream_created(self, channel, stream):
        """
        On signal "stream-created">, connect signals on stream and add source.
        Then we link the stream's sink-pad  to source's src-pad.
        """
        
        print
        print "=== %s __on_stream_created ===" % self
        print
        
        stream.connect('src-pad-added', self.__on_src_pad_added)
        stream.connect('closed', debug_callback, "closed")          # Not used
        stream.connect('error', debug_callback, "error")            # Not used
        stream.connect('free-resource', debug_callback, "free")     # Not used
        
        # creating src pipes
        type = stream.get_property ("media-type")
        if type == farsight.MEDIA_TYPE_AUDIO:
            source = gst.parse_bin_from_description (
                            "audiotestsrc", True) 
        
        elif type == farsight.MEDIA_TYPE_VIDEO:
            if mode == "master":
                source = gst.parse_bin_from_description (
#                            "v4l2src device='/dev/video0'", True)
                            "videotestsrc", True)
            else:
                source = gst.parse_bin_from_description (
#                            "v4l2src device='/dev/video1'", True)
                            "videotestsrc", True)                
    
        self.pipeline.add(source)        
        source.get_pad("src").link(stream.get_property("sink-pad"))        
        source.set_state(gst.STATE_PLAYING)
    
    def __on_src_pad_added (self, stream, pad, codec):
        """
        On signal "src-pad-added", we display stream view
        """
        
        print
        print "=== %s __src_pad_added ===" % self
        print
        
        type = stream.get_property ("media-type")
        if type == farsight.MEDIA_TYPE_AUDIO:
            queue_sink = gst.parse_bin_from_description("queue ! audioconvert ! audioresample ! audioconvert ! autoaudiosink", True)
            
            audioadder = gst.element_factory_make("liveadder")
            tee = gst.element_factory_make("tee")
            
            # Add & Link
            self.pipeline.add(audioadder, tee, queue_sink)
            gst.element_link_many(audioadder, tee, queue_sink)            
            pad.link(audioadder.get_pad("sink%d"))            
            
            queue_sink.set_state(gst.STATE_PLAYING)
            tee.set_state(gst.STATE_PLAYING)
            audioadder.set_state(gst.STATE_PLAYING)
                
        elif type == farsight.MEDIA_TYPE_VIDEO:
            queue_ff = gst.parse_bin_from_description("queue ! ffmpegcolorspace", True)

            if USE_CLUTTER_TO_DISPLAY:
                sink = cluttergst.VideoSink(video_texture)
            else:
                sink = gst.parse_bin_from_description("videoscale ! video/x-raw-yuv,width=320,height=240 ! autovideosink", True)
            
            
            videofunnel = gst.element_factory_make("fsfunnel")
            tee = gst.element_factory_make("tee")
            
            # Add & Link
            self.pipeline.add(videofunnel, tee, queue_ff,  sink)
            gst.element_link_many(videofunnel, tee, queue_ff, sink)
            pad.link(videofunnel.get_pad("sink%d"))    
                        
            sink.set_state(gst.STATE_PLAYING)
            queue_ff.set_state(gst.STATE_PLAYING)
            tee.set_state(gst.STATE_PLAYING)
            videofunnel.set_state(gst.STATE_PLAYING)
            self.pipeline.set_state(gst.STATE_PLAYING)
           
    def __async_handler (self, bus, message):
        """
        Check all bus message and redirect them to tf_channel (obligatory)
        """
        
        if self.tf_channel != None:
            self.tf_channel.bus_message(message)
        return True
    
##################################################
##################################################
if __name__ == '__main__':
    # Manage parameters
    if len(sys.argv) < 2:
        print "Usage: %s {master|slave} [--enable clutter]" % sys.argv[0]
        exit(1)
    
    if len(sys.argv) == 4 and sys.argv[2] == "--enable" and sys.argv[3] == "clutter":               
        USE_CLUTTER_TO_DISPLAY = True
    else:
        USE_CLUTTER_TO_DISPLAY = False
    
    
    if USE_CLUTTER_TO_DISPLAY:
        import clutter
        import cluttergst
        clutter.threads_init()
        clutter.init()
    
    ##
    ## Mode
    ##
    mode = sys.argv[1]    
    print "=== %s ===" % sys.argv[0]
    print "Mode: %s" % mode
    
    # Enable parameters due to mode
    if mode == "master":
        parameters = {"account": MASTER_ACCOUNT, "password": MASTER_PASSWORD,
                      "port": dbus.UInt32(5222), "server": MASTER_ACCOUNT.split("@")[-1]}
    else:
        parameters = {"account": SLAVE_ACCOUNT, "password": SLAVE_PASSWORD,
                      "port": dbus.UInt32(5222), "server": SLAVE_ACCOUNT.split("@")[-1]}
    
    ##
    ## Clutter
    ##
    if USE_CLUTTER_TO_DISPLAY:
        stage = clutter.Stage(default=True)
        stage.set_size(320, 240)
        stage.set_color(clutter.color_from_string('DarkSlateGrey'))
        
        video_texture = clutter.Texture()
        video_texture.show()
        stage.add(video_texture)
    
    ##
    ## Telepathy
    ##
    # Get the requested manager
    reg = telepathy.client.ManagerRegistry()
    reg.LoadManagers()    
    manager = reg.GetManager("gabble")
    
    # Request connection
    conn_bus_name, conn_object_path = manager[CONNECTION_MANAGER].\
                                            RequestConnection("jabber", parameters)             
    conn = telepathy.client.Connection(conn_bus_name, conn_object_path)
    
    # Listen signals
    conn[CONNECTION].connect_to_signal("NewChannel", on_new_channel_user)
    conn[CONNECTION].connect_to_signal("StatusChanged", on_status_changed_user)
    
    # Connect
    conn[CONNECTION].Connect()
    
    ##
    ## Test
    ##
    if USE_CLUTTER_TO_DISPLAY:
        stage.show_all()
        clutter.main()
    else:
        loop = gobject.MainLoop()
        try:
            loop.run()
        except KeyboardInterrupt:
            print 'interrupted'
------------------------------------------------------------------------------
Let Crystal Reports handle the reporting - Free Crystal Reports 2008 30-Day 
trial. Simplify your report design, integration and deployment - and focus on 
what you do best, core application coding. Discover what's new with
Crystal Reports now.  http://p.sf.net/sfu/bobj-july
_______________________________________________
Farsight-devel mailing list
Farsight-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/farsight-devel

Reply via email to