Revision: 74
Author:   [email protected]
Date:     Sun Feb 10 12:42:20 2013
Log:      Support custom ASAO sampling rate;
Resampling pcm to SIP codec sampling rate;
Fixing bugs.
http://code.google.com/p/red5phone/source/detail?r=74

Added:
 /branches/red5sip/lib/libresample4j-1.0.jar
 /branches/red5sip/lib/sources
 /branches/red5sip/lib/sources/red5-1.0-sources.jar
 /branches/red5sip/src/java/org/red5/sip/app/BytesBuffer.java
Modified:
 /branches/red5sip/build.xml
 /branches/red5sip/settings.properties
 /branches/red5sip/src/java/org/red5/sip/app/Application.java
 /branches/red5sip/src/java/org/red5/sip/app/RTMPRoomClient.java
 /branches/red5sip/src/java/org/red5/sip/app/RTPStreamForMultiplex.java
/branches/red5sip/src/java/org/red5/sip/app/RTPStreamMultiplexingSender.java

=======================================
--- /dev/null   
+++ /branches/red5sip/lib/libresample4j-1.0.jar Sun Feb 10 12:42:20 2013
Binary file, no diff available.
=======================================
--- /dev/null   
+++ /branches/red5sip/lib/sources/red5-1.0-sources.jar Sun Feb 10 12:42:20 2013
Binary file, no diff available.
=======================================
--- /dev/null
+++ /branches/red5sip/src/java/org/red5/sip/app/BytesBuffer.java Sun Feb 10 12:42:20 2013
@@ -0,0 +1,70 @@
+package org.red5.sip.app;
+
+public class BytesBuffer {
+    private final int buffersCount;
+    private final int arrayLength;
+    private byte[][] buffer;
+    private int[] bufLen;
+    private int start, end;
+    public BytesBuffer(int arrayLength, int buffersCount) {
+        this.buffersCount = buffersCount;
+        this.arrayLength = arrayLength;
+        this.buffer = new byte[buffersCount][arrayLength];
+        this.bufLen = new int[buffersCount];
+    }
+
+    protected void onBufferOverflow() {
+        clean();
+    }
+
+    protected void onBufferEmpty() {
+
+    }
+
+    public void clean() {
+        end = 0;
+        start = -1;
+    }
+
+    protected int available() {
+ return (end > start) ? (end - start) : (buffersCount - start + end);
+    }
+
+    protected float bufferUsage() {
+        return available() * 1.0f / buffersCount;
+    }
+
+    public void push(byte[] array, int offset, int length) {
+        if(end == start) {
+            onBufferOverflow();
+        }
+        if(arrayLength < length) {
+ throw new IllegalArgumentException("Array length too much: " + length);
+        }
+        System.arraycopy(array, offset, buffer[end], 0, length);
+        bufLen[end++] = length;
+        if(end == buffersCount) {
+            end = 0;
+        }
+        if(start == -1) {
+            start = 0;
+        }
+    }
+
+    public int take(byte[] dst, int offset) {
+        int res = -1;
+        if(start >= 0) {
+ System.arraycopy(buffer[start], 0, dst, offset, Math.min(bufLen[start], dst.length - offset));
+            res = bufLen[start++];
+            if(start == buffersCount) {
+                start = 0;
+            }
+            if(start == end) {
+                start = -1;
+                end = 0;
+                onBufferEmpty();
+            }
+        }
+        return res;
+    }
+}
=======================================
--- /branches/red5sip/build.xml Fri Feb 10 07:37:46 2012
+++ /branches/red5sip/build.xml Sun Feb 10 12:42:20 2013
@@ -85,6 +85,10 @@
     <pathelement location="${basedir}/lib/red5.jar"/>
     <pathelement location="${basedir}/lib/xmlrpc-2.0.1.jar"/>
   </path>
+
+    <path id="library.libresample4j.classpath">
+        <pathelement location="${basedir}/lib/libresample4j-1.0.jar"/>
+    </path>

   <path id="library.red5sip.classpath">
     <pathelement location="${basedir}/lib/red5sip.jar"/>
@@ -149,6 +153,7 @@
     <path refid="library.mjsip.classpath"/>
     <path refid="library.red5.classpath"/>
     <path refid="library.log.classpath"/>
+    <path refid="library.libresample4j.classpath"/>
     <path refid="library.apache-commons.classpath"/>
     <path refid="library.openmeetings.classpath"/>
     <path refid="library.springframwork.classpath"/>
@@ -160,6 +165,7 @@
     <path refid="library.mjsip.classpath"/>
     <path refid="library.red5.classpath"/>
     <path refid="library.log.classpath"/>
+    <path refid="library.libresample4j.classpath"/>
     <path refid="library.apache-commons.classpath"/>
     <path refid="library.openmeetings.classpath"/>
     <path refid="library.springframwork.classpath"/>
@@ -171,6 +177,7 @@
     <path refid="library.mjsip.classpath"/>
     <path refid="library.red5.classpath"/>
     <path refid="library.log.classpath"/>
+    <path refid="library.libresample4j.classpath"/>
     <path refid="library.apache-commons.classpath"/>
     <path refid="library.openmeetings.classpath"/>
     <path refid="library.springframwork.classpath"/>
@@ -183,6 +190,7 @@
     <path refid="library.mjsip.classpath"/>
     <path refid="library.red5.classpath"/>
     <path refid="library.log.classpath"/>
+    <path refid="library.libresample4j.classpath"/>
     <path refid="library.apache-commons.classpath"/>
     <path refid="library.openmeetings.classpath"/>
     <path refid="library.springframwork.classpath"/>
=======================================
--- /branches/red5sip/settings.properties       Mon Feb  4 23:34:05 2013
+++ /branches/red5sip/settings.properties       Sun Feb 10 12:42:20 2013
@@ -1,5 +1,6 @@
 red5.host=127.0.0.1
 red5.codec=asao
+red5.codec.rate=22
 sip.obproxy=127.0.0.1
 sip.phone=test
 sip.authid=test
=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/Application.java Tue Jan 29 03:52:07 2013 +++ /branches/red5sip/src/java/org/red5/sip/app/Application.java Sun Feb 10 12:42:20 2013
@@ -63,6 +63,12 @@
             return;
         }
         props = PropertiesUtils.load(settings);
+        try {
+ RTPStreamMultiplexingSender.sampling = RTPStreamMultiplexingSender.SAMPLE_RATE.findByShortName(Integer.parseInt(props.getProperty("red5.codec.rate", "22")));
+        } catch (NumberFormatException e) {
+            log.error("Can't parse red5.codec.rate value", e);
+        }
+
     }

     public void init(DaemonContext daemonContext) throws Exception {
@@ -70,21 +76,34 @@
     }

     public void start() throws Exception {
- this.rtmpControlClient = new RTMPControlClient(props.getProperty("red5.host"), "openmeetings") {
-            @Override
-            protected void startRoomClient(int id) {
-                transportMap.put(id, createSIPTransport(props, id));
+        String roomsStr = props.getProperty("rooms",null);
+ if(props.getProperty("rooms.forceStart","no").equals("yes") && roomsStr != null) {
+            String[] rooms = roomsStr.split(",");
+            for(String room: rooms) {
+                try {
+                    int id = Integer.parseInt(room);
+                    transportMap.put(id, createSIPTransport(props, id));
+                } catch (NumberFormatException e) {
+ log.error("Room id parsing error: id=\"" + room + "\"");
+                }
             }
+        } else {
+ this.rtmpControlClient = new RTMPControlClient(props.getProperty("red5.host"), "openmeetings") {
+                @Override
+                protected void startRoomClient(int id) {
+                    transportMap.put(id, createSIPTransport(props, id));
+                }

-            @Override
-            protected void stopRoomClient(int id) {
-                SIPTransport t = transportMap.remove(id);
-                if(t != null) {
-                    t.close();
+                @Override
+                protected void stopRoomClient(int id) {
+                    SIPTransport t = transportMap.remove(id);
+                    if(t != null) {
+                        t.close();
+                    }
                 }
-            }
-        };
-        this.rtmpControlClient.start();
+            };
+            this.rtmpControlClient.start();
+        }
     }

     public void stop() throws Exception {
=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/RTMPRoomClient.java Wed Feb 6 09:31:44 2013 +++ /branches/red5sip/src/java/org/red5/sip/app/RTMPRoomClient.java Sun Feb 10 12:42:20 2013
@@ -147,6 +147,7 @@
     private void createPlayStream( long broadCastId ) {

         log.debug( "create play stream" );
+        broadcastIds.add((int)broadCastId);
IPendingServiceCallback wrapper = new CreatePlayStreamCallBack(broadCastId);
         invoke( "createStream", null, wrapper );
     }
@@ -338,6 +339,9 @@

     public void newStream(Client client) {
         log.debug("newStream:" + client.getBroadCastID());
+        if(broadcastIds.contains((int)client.getBroadCastID())) {
+            closeStream(client);
+        }
         createPlayStream(client.getBroadCastID());
     }

=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/RTPStreamForMultiplex.java Wed Feb 6 09:31:44 2013 +++ /branches/red5sip/src/java/org/red5/sip/app/RTPStreamForMultiplex.java Sun Feb 10 12:42:20 2013
@@ -1,73 +1,63 @@
 package org.red5.sip.app;

+import org.red5.codecs.asao.DecoderMap;
 import org.red5.logging.Red5LoggerFactory;
 import org.slf4j.Logger;

+import static org.red5.sip.app.RTPStreamMultiplexingSender.NELLYMOSER_ENCODED_PACKET_SIZE;
+
 public class RTPStreamForMultiplex implements IMediaStream {
protected static Logger log = Red5LoggerFactory.getLogger(RTPStreamForMultiplex.class, "sip");

     private int streamId;
-    private RTPStreamMultiplexingSender sender;
-    private long syncSource;
-    private static int BUFFERS_COUNT = 1024;
-    private byte[][] buffer = new byte[BUFFERS_COUNT][65];
-    private int[] bufLen = new int[BUFFERS_COUNT];
-    private int start, end;
     private boolean ready = false;
+    protected DecoderMap decoderMap = null;
+ private BytesBuffer buffer = new BytesBuffer(NELLYMOSER_ENCODED_PACKET_SIZE, 200) {
+        @Override
+        protected void onBufferOverflow() {
+            super.onBufferOverflow();
+            log.error("Stream %d buffer overflow. Buffer is cleared");
+        }

- protected RTPStreamForMultiplex(int streamId, long syncSource, RTPStreamMultiplexingSender sender) {
+        @Override
+        protected void onBufferEmpty() {
+            super.onBufferEmpty();
+            ready = false;
+        }
+    };
+
+    protected RTPStreamForMultiplex(int streamId) {
         this.streamId = streamId;
-        this.sender = sender;
-        this.syncSource = syncSource;
-        end = 0;
-        start = -1;
     }

     public int getStreamId() {
         return streamId;
     }

- public synchronized void send(long timestamp, byte[] asaoBuffer, int offset, int num) {
-        if(end == start) {
- log.error("Stream buffer overflow: streamId: " + streamId + ", start: " + start + ", end: " + end);
-            return;
-        }
-        System.arraycopy(asaoBuffer, 0, buffer[end], 0, asaoBuffer.length);
-        bufLen[end++] = num;
-        if(end == BUFFERS_COUNT) {
-            end = 0;
-        }
-        if(start == -1) {
-            start = 0;
+ public void send(long timestamp, byte[] asaoBuffer, int offset, int num) {
+        System.out.println("Stream " + streamId + " send");
+        for(int i=0;i<num;i+=NELLYMOSER_ENCODED_PACKET_SIZE) {
+            synchronized (this) {
+ buffer.push(asaoBuffer, offset+i, NELLYMOSER_ENCODED_PACKET_SIZE);
+            }
+            Thread.yield();
         }
-
-        if(!ready && available() > 10) {
-            ready = true;
+        synchronized (this) {
+            if(!ready && buffer.bufferUsage() > 0.2) {
+                ready = true;
+            }
         }
     }
-
-    protected synchronized int available() {
- return (end > start) ? (end - start) : (BUFFERS_COUNT - start + end);
-    }

     protected synchronized boolean ready() {
         return ready;
     }
+
+    protected synchronized float bufferUsage() {
+        return buffer.bufferUsage();
+    }

     protected synchronized int read(byte[] dst, int offset) {
-        int res = -1;
-        if(start >= 0) {
-            System.arraycopy(buffer[start], 0, dst, offset, dst.length);
-            res = start++;
-            if(start == BUFFERS_COUNT) {
-                start = 0;
-            }
-            if(start == end) {
-                start = -1;
-                end = 0;
-                ready = false;
-            }
-        }
-        return res;
+        return buffer.take(dst, offset);
     }
 }
=======================================
--- /branches/red5sip/src/java/org/red5/sip/app/RTPStreamMultiplexingSender.java Wed Feb 6 09:31:44 2013 +++ /branches/red5sip/src/java/org/red5/sip/app/RTPStreamMultiplexingSender.java Sun Feb 10 12:42:20 2013
@@ -1,5 +1,6 @@
 package org.red5.sip.app;

+import com.laszlosystems.libresample4j.Resampler;
 import local.net.RtpPacket;
 import local.net.RtpSocket;
 import org.apache.mina.util.ConcurrentHashSet;
@@ -20,9 +21,36 @@

     public static int RTP_HEADER_SIZE = 12;

-    protected static final int NELLYMOSER_DECODED_PACKET_SIZE = 256;
+    public static enum SAMPLE_RATE {
+        SAMPLING_8000(8000, 8),
+        SAMPLING_16000(16000, 16),
+        SAMPLING_32000(32000, 32),
+        SAMPLING_48000(48000, 48),
+        SAMPLING_11025(11025, 11),
+        SAMPLING_22050(22050, 22),
+        SAMPLING_44100(44100, 44);

-    protected static final int NELLYMOSER_ENCODED_PACKET_SIZE = 64;
+        public int rate;
+        public int shortName;
+        SAMPLE_RATE(int rate, int shortName) {
+            this.rate = rate;
+            this.shortName = shortName;
+        }
+
+        public static SAMPLE_RATE findByShortName(int shortName) {
+            for(SAMPLE_RATE s: SAMPLE_RATE.values()) {
+                if(s.shortName == shortName) {
+                    return s;
+                }
+            }
+            throw new IllegalArgumentException("Invalid sample rate");
+        }
+    }
+
+    protected static SAMPLE_RATE sampling = SAMPLE_RATE.SAMPLING_22050;
+
+ protected static final int NELLYMOSER_DECODED_PACKET_SIZE = 256;// * sampling.blocks; + protected static final int NELLYMOSER_ENCODED_PACKET_SIZE = 64;// * sampling.blocks;

     RtpSocket rtpSocket = null;

@@ -46,26 +74,19 @@
     private int seqn = 0;

     private long time = 0;
-    private long syncSource = 0;

     // Temporary buffer with received PCM audio from FlashPlayer.
-    float[] tempBuffer;
+    private float[] tempBuffer;
+    private float[] multiplexedBuffer;

-    float[] decodedBuffer1;
-    float[] decodedBuffer2;
-
-    volatile int multiplexingCount = 0;
+    private Resampler resampler;
+    private double factor = 1;

     private Thread sendThread = new Thread(this);

ConcurrentHashSet<WeakReference<RTPStreamForMultiplex>> streamSet = new ConcurrentHashSet<WeakReference<RTPStreamForMultiplex>>(); //Set<RTPStreamForMultiplex> streamSet = Collections.synchronizedSet(new WeakHashSet<RTPStreamForMultiplex>());

-    private static int SEND_BUFFER_MAX = 100;
-    private byte[][] sendBuffer = new byte[SEND_BUFFER_MAX][];
-    private int[] sendBufferLength = new int[SEND_BUFFER_MAX];
-    private int sendBufferPos = 0;
-
     // Floats remaining on temporary buffer.
     int tempBufferRemaining = 0;

@@ -168,11 +189,6 @@
         mediaReceiver.setSender(this);
         this.sipCodec = sipCodec;
         this.doSync = do_sync;
-        this.sendBufferPos = 0;
-
-        for(int i=0; i<SEND_BUFFER_MAX; i++) {
- sendBuffer[i] = new byte[sipCodec.getOutgoingEncodedFrameSize() + RTP_HEADER_SIZE];
-        }

         try {
             if (src_socket == null) {
@@ -194,24 +210,22 @@

         seqn = 0;
         time = 0;
-        syncSource = 0;

println("start()", "using blocks of " + (packetBuffer.length - RTP_HEADER_SIZE) + " bytes.");

         decoder = new Decoder();
-        decoderMap = null;

         sendThread.start();
     }

     private void fillDecodedBuffer(byte[] asaoBuffer, float[] tempBuffer) {
ByteStream audioStream = new ByteStream(asaoBuffer, 1, NELLYMOSER_ENCODED_PACKET_SIZE); - decoderMap = decoder.decode(decoderMap, audioStream.bytes, 1, tempBuffer, 0); + decoderMap = decoder.decode(decoderMap, audioStream.bytes, 0, tempBuffer, 0);
         //ResampleUtils.normalize(tempBuffer, tempBuffer.length);
     }

-    public synchronized IMediaStream createStream(int streamId) {
- RTPStreamForMultiplex stream = new RTPStreamForMultiplex(streamId, syncSource, this);
+    public IMediaStream createStream(int streamId) {
+        RTPStreamForMultiplex stream = new RTPStreamForMultiplex(streamId);
         streamSet.add(new WeakReference<RTPStreamForMultiplex>(stream));
         return stream;
     }
@@ -219,15 +233,30 @@
     public void deleteStream(int streamId) {
for (Iterator<WeakReference<RTPStreamForMultiplex>> iterator = streamSet.iterator(); iterator.hasNext(); ) {
             WeakReference<RTPStreamForMultiplex> ref = iterator.next();
- if (ref.get() != null && ref.get().getStreamId() == streamId ) {
-                iterator.remove();
+            RTPStreamForMultiplex stream = ref.get();
+            try {
+                if (stream != null && stream.getStreamId() == streamId ) {
+                    iterator.remove();
+                }
+            } catch (NullPointerException ignored) {
+
             }
         }
     }

-    private void doRtpDelay() {
+    private void doRtpDelay(float bufferUsage) {
+        //TODO: make proper pause correction.
+ //Pause should not exceed packetization and prevent emtying of buffer
         try {
-            Thread.sleep( sipCodec.getOutgoingPacketization() - 2);
+            long pause = sipCodec.getOutgoingPacketization() - 1;
+            if(bufferUsage > .5f) {
+                pause -= 5;
+            }
+            if(bufferUsage > .2f) {
+                pause -= 1;
+            }
+            System.out.println("Sleep pause: " + pause);
+            Thread.sleep( pause, 800000 );
         }
         catch ( Exception e ) {
         }
@@ -235,48 +264,65 @@

     public void run() {
         if (!hasInitilializedBuffers) {
-            tempBuffer = new float[NELLYMOSER_DECODED_PACKET_SIZE];
-            decodedBuffer1 = new float[NELLYMOSER_DECODED_PACKET_SIZE];
-            decodedBuffer2 = new float[NELLYMOSER_DECODED_PACKET_SIZE];
+            multiplexedBuffer = new float[NELLYMOSER_DECODED_PACKET_SIZE];
encodingBuffer = new float[sipCodec.getOutgoingDecodedFrameSize()];
-            multiplexingCount = 0;
+
+            if(sipCodec.getSampleRate() == sampling.rate) {
+                tempBuffer = new float[NELLYMOSER_DECODED_PACKET_SIZE];
+                resampler = null;
+            } else {
+                factor = sipCodec.getSampleRate() / (double)sampling.rate;
+                resampler = new Resampler(true, factor, factor);
+ tempBuffer = new float[(int)(NELLYMOSER_DECODED_PACKET_SIZE * factor)];
+            }
             hasInitilializedBuffers = true;
         }
-        byte[] asaoBuffer = new byte[65];
+
+        float[] decodedBuffer = new float[NELLYMOSER_DECODED_PACKET_SIZE];
+        byte[] asaoBuffer = new byte[NELLYMOSER_ENCODED_PACKET_SIZE];
+
+        int disableStream = 0;
+
         while(rtpSocket != null) {
-            multiplexingCount = 0;
- for(Iterator<WeakReference<RTPStreamForMultiplex>> i = streamSet.iterator(); i.hasNext();) {
-                int len = -1;
-                WeakReference<RTPStreamForMultiplex> ref = i.next();
-                RTPStreamForMultiplex stream = ref.get();
-                if(stream != null) {
-                    synchronized (stream) {
- System.out.println(String.format("Stream id %d, avail %d", stream.getStreamId(), stream.available()));
-                        if(stream != null && stream.ready()) {
+            float bufferUsage = 0;
+            int multiplexingCount = 0;
+            try {
+ for(Iterator<WeakReference<RTPStreamForMultiplex>> i = streamSet.iterator(); i.hasNext();) {
+                    int len = -1;
+                    WeakReference<RTPStreamForMultiplex> ref = i.next();
+                    RTPStreamForMultiplex stream = ref.get();
+                    if(stream != null) {
+ if(stream.ready() && disableStream != stream.getStreamId()) {
                             len = stream.read(asaoBuffer, 0);
+ bufferUsage = Math.max(bufferUsage, stream.bufferUsage()); + System.out.println(String.format("Stream id %d, buffer %f", stream.getStreamId(), stream.bufferUsage()));
                         } else {
-                            break;
+                            continue;
                         }
+                    } else {
+                        i.remove();
+                        continue;
                     }
-                } else {
-                    i.remove();
-                    break;
-                }
-                if(len != -1) {
-                    fillDecodedBuffer(asaoBuffer, decodedBuffer1);
-                    if (multiplexingCount > 0) {
- ResampleUtils.multiplex(decodedBuffer2, decodedBuffer1);
-                    } else {
- BufferUtils.floatBufferIndexedCopy(decodedBuffer2, 0, decodedBuffer1, 0, decodedBuffer1.length);
+                    if(len != -1) {
+ ByteStream audioStream = new ByteStream(asaoBuffer, 1, NELLYMOSER_ENCODED_PACKET_SIZE); + stream.decoderMap = decoder.decode(stream.decoderMap, audioStream.bytes, 0, decodedBuffer, 0);
+                        //fillDecodedBuffer(asaoBuffer, decodedBuffer);
+                        if (multiplexingCount > 0) {
+ ResampleUtils.multiplex(multiplexedBuffer, decodedBuffer);
+                        } else {
+ System.arraycopy(decodedBuffer, 0, multiplexedBuffer, 0, decodedBuffer.length);
+                        }
+                        multiplexingCount++;
                     }
-                    multiplexingCount++;
-                    //System.out.println("Multiplex stream: " + streamId);
+                    Thread.yield();
                 }
-                Thread.yield();
+            } catch (Exception e) {
+                log.error("Exception", e);
             }
             if(multiplexingCount > 0) {
- //System.out.println("Send: multiplexed: " + multiplexingCount + ", total streams: " + streamSet.size()); - ResampleUtils.normalize(decodedBuffer2, decodedBuffer2.length); + System.out.println("Send: multiplexed: " + multiplexingCount + ", total streams: " + streamSet.size()); +// ResampleUtils.normalize(multiplexedBuffer, 1.0f/multiplexingCount); + ResampleUtils.normalize(multiplexedBuffer, multiplexedBuffer.length);
                 try {
                     asao_buffer_processed = false;
                     do {
@@ -286,7 +332,8 @@
                         }
if (encodingOffset == sipCodec.getOutgoingDecodedFrameSize()) {
                             rtpSocketSend(rtpPacket);
-                            doRtpDelay();
+                            doRtpDelay(bufferUsage);
+ //System.out.println("rtpSocketSend, bufferUsage: " + bufferUsage);
                             encodingOffset = 0;
                         }
                     }
@@ -339,10 +386,13 @@

                 }

-                // Decode new asao packet.
-
+                // Process next buffer
                 asao_buffer_processed = true;
- BufferUtils.floatBufferIndexedCopy(tempBuffer, 0, decodedBuffer2, 0, decodedBuffer2.length);
+                if(resampler == null) {
+ System.arraycopy(multiplexedBuffer, 0, tempBuffer, 0, tempBuffer.length);
+                } else {
+ resampler.process(factor, multiplexedBuffer, 0, multiplexedBuffer.length, true, tempBuffer, 0, tempBuffer.length);
+                }
// tempBuffer = ResampleUtils.normalize(tempBuffer, 256); // normalise volume

                 tempBufferRemaining = tempBuffer.length;
@@ -404,8 +454,6 @@

         println("halt", "Terminated");
     }
-
-    private long lastMS = System.currentTimeMillis();

     private void rtpSocketSend(RtpPacket rtpPacket) {
         try {

Reply via email to