This is an automated email from the ASF dual-hosted git repository.

solomax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openmeetings.git


The following commit(s) were added to refs/heads/master by this push:
     new 693bf74  [OPENMEETINGS-2492] pipeline is created per KStream
693bf74 is described below

commit 693bf7496a9c6e1dbcc5ba32e736dc09b1c9f321
Author: Maxim Solodovnik <solomax...@gmail.com>
AuthorDate: Fri Oct 23 14:39:12 2020 +0700

    [OPENMEETINGS-2492] pipeline is created per KStream
---
 .../org/apache/openmeetings/core/remote/KRoom.java | 30 +-------
 .../apache/openmeetings/core/remote/KStream.java   | 69 ++++++++++++++---
 .../openmeetings/core/remote/KurentoHandler.java   | 88 +++++++++++++---------
 .../openmeetings/core/remote/StreamProcessor.java  | 25 +++---
 .../apache/openmeetings/core/sip/SipManager.java   |  2 +-
 .../core/remote/TestRecordingFlowMocked.java       |  4 +-
 .../apache/openmeetings/web/app/ClientManager.java |  8 +-
 .../apache/openmeetings/web/app/TimerService.java  |  2 +-
 .../apache/openmeetings/web/room/RoomPanel.java    |  6 +-
 .../org/apache/openmeetings/web/room/raw-video.js  | 33 ++++----
 .../openmeetings/web/app/TestApplication.java      |  2 +-
 11 files changed, 154 insertions(+), 115 deletions(-)

diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
index c7a8709..8215b37 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
@@ -25,7 +25,6 @@ import static java.util.UUID.randomUUID;
 import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE;
 import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg;
 
-import java.util.Collection;
 import java.util.Date;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -46,8 +45,6 @@ import org.apache.openmeetings.db.manager.IClientManager;
 import org.apache.openmeetings.db.util.FormatHelper;
 import org.apache.openmeetings.db.util.ws.RoomMessage;
 import org.apache.openmeetings.db.util.ws.TextRoomMessage;
-import org.kurento.client.Continuation;
-import org.kurento.client.MediaPipeline;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,7 +64,6 @@ public class KRoom {
        private final StreamProcessor processor;
        private final RecordingChunkDao chunkDao;
        private final IApplication app;
-       private final MediaPipeline pipeline;
        private final Long roomId;
        private final Room.Type type;
        private final AtomicBoolean recordingStarted = new AtomicBoolean(false);
@@ -76,13 +72,12 @@ public class KRoom {
        private JSONObject recordingUser = new JSONObject();
        private JSONObject sharingUser = new JSONObject();
 
-       public KRoom(KurentoHandler handler, Room r, MediaPipeline pipeline) {
+       public KRoom(KurentoHandler handler, Room r) {
                this.processor = handler.getStreamProcessor();
                this.chunkDao = handler.getChunkDao();
                this.app = handler.getApp();
                this.roomId = r.getId();
                this.type = r.getType();
-               this.pipeline = pipeline;
                log.info("ROOM {} has been created", roomId);
        }
 
@@ -98,25 +93,17 @@ public class KRoom {
                return recordingId;
        }
 
-       public MediaPipeline getPipeline() {
-               return pipeline;
-       }
-
        public RecordingChunkDao getChunkDao() {
                return chunkDao;
        }
 
-       public KStream join(final StreamDesc sd) {
+       public KStream join(final StreamDesc sd, KurentoHandler kHandler) {
                log.info("ROOM {}: join client {}, stream: {}", roomId, 
sd.getClient(), sd.getUid());
-               final KStream stream = new KStream(sd, this);
+               final KStream stream = new KStream(sd, this, kHandler);
                processor.addStream(stream);
                return stream;
        }
 
-       public Collection<KStream> getParticipants() {
-               return processor.getByRoom(this.getRoomId());
-       }
-
        public void onStopBroadcast(KStream stream) {
                processor.release(stream, true);
                WebSocketHelper.sendAll(newKurentoMsg()
@@ -269,17 +256,6 @@ public class KRoom {
                processor.getByRoom(this.getRoomId()).forEach(
                                stream -> stream.release(processor)
                );
-               pipeline.release(new Continuation<Void>() {
-                       @Override
-                       public void onSuccess(Void result) throws Exception {
-                               log.trace("ROOM {}: Released Pipeline", 
KRoom.this.roomId);
-                       }
-
-                       @Override
-                       public void onError(Throwable cause) throws Exception {
-                               log.warn("PARTICIPANT {}: Could not release 
Pipeline", KRoom.this.roomId);
-                       }
-               });
                log.debug("Room {} closed", this.roomId);
        }
 }
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
index 754ee15..930d4ea 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
@@ -49,6 +49,7 @@ import org.apache.openmeetings.db.util.ws.TextRoomMessage;
 import org.kurento.client.Continuation;
 import org.kurento.client.IceCandidate;
 import org.kurento.client.MediaFlowState;
+import org.kurento.client.MediaPipeline;
 import org.kurento.client.MediaProfileSpecType;
 import org.kurento.client.MediaType;
 import org.kurento.client.RecorderEndpoint;
@@ -62,38 +63,49 @@ import com.github.openjson.JSONObject;
 public class KStream extends AbstractStream {
        private static final Logger log = 
LoggerFactory.getLogger(KStream.class);
 
+       private final KurentoHandler kHandler;
        private final KRoom room;
        private final Date connectedSince;
        private final StreamType streamType;
        private MediaProfileSpecType profile;
+       private MediaPipeline pipeline;
        private RecorderEndpoint recorder;
        private WebRtcEndpoint outgoingMedia = null;
        private final ConcurrentMap<String, WebRtcEndpoint> listeners = new 
ConcurrentHashMap<>();
        private Optional<CompletableFuture<Object>> flowoutFuture = 
Optional.empty();
        private Long chunkId;
        private Type type;
+       private boolean hasAudio;
+       private boolean hasVideo;
+       private boolean hasScreen;
 
-       public KStream(final StreamDesc sd, KRoom room) {
+       public KStream(final StreamDesc sd, KRoom room, KurentoHandler 
kHandler) {
                super(sd.getSid(), sd.getUid());
                this.room = room;
                streamType = sd.getType();
                this.connectedSince = new Date();
+               this.kHandler = kHandler;
                //TODO Min/MaxVideoSendBandwidth
                //TODO Min/Max Audio/Video RecvBandwidth
        }
 
-       public KStream startBroadcast(final StreamProcessor processor, final 
StreamDesc sd, final String sdpOffer) {
+       public void startBroadcast(
+                       final StreamProcessor processor
+                       , final StreamDesc sd
+                       , final String sdpOffer
+                       , Runnable then)
+       {
                if (outgoingMedia != null) {
                        release(processor, false);
                }
-               final boolean hasAudio = sd.hasActivity(Activity.AUDIO);
-               final boolean hasVideo = sd.hasActivity(Activity.VIDEO);
-               final boolean hasScreen = sd.hasActivity(Activity.SCREEN);
+               hasAudio = sd.hasActivity(Activity.AUDIO);
+               hasVideo = sd.hasActivity(Activity.VIDEO);
+               hasScreen = sd.hasActivity(Activity.SCREEN);
                if ((sdpOffer.indexOf("m=audio") > -1 && !hasAudio)
                                || (sdpOffer.indexOf("m=video") > -1 && 
!hasVideo && StreamType.SCREEN != streamType))
                {
                        log.warn("Broadcast started without enough rights");
-                       return this;
+                       return;
                }
                if (StreamType.SCREEN == streamType) {
                        type = Type.SCREEN;
@@ -119,6 +131,25 @@ public class KStream extends AbstractStream {
                                profile = MediaProfileSpecType.WEBM_VIDEO_ONLY;
                                break;
                }
+               pipeline = kHandler.createPipiline(room.getRoomId(), 
sd.getUid(), new Continuation<Void>() {
+                       @Override
+                       public void onSuccess(Void result) throws Exception {
+                               internalStartBroadcast(processor, sd, sdpOffer);
+                               then.run();
+                       }
+
+                       @Override
+                       public void onError(Throwable cause) throws Exception {
+                               log.warn("Unable to create pipeline {}", 
KStream.this.uid, cause);
+                       }
+               });
+       }
+
+       private void internalStartBroadcast(
+                       final StreamProcessor processor
+                       , final StreamDesc sd
+                       , final String sdpOffer)
+       {
                outgoingMedia = createEndpoint(processor, sd.getSid(), 
sd.getUid());
                outgoingMedia.addMediaSessionTerminatedListener(evt -> 
log.warn("Media stream terminated {}", sd));
                outgoingMedia.addMediaFlowOutStateChangeListener(evt -> {
@@ -151,10 +182,9 @@ public class KStream extends AbstractStream {
                if (hasAudio || hasVideo || hasScreen) {
                        WebSocketHelper.sendRoomOthers(room.getRoomId(), 
c.getUid(), newKurentoMsg()
                                        .put("id", "newStream")
-                                       .put(PARAM_ICE, 
processor.getHandler().getTurnServers(c))
+                                       .put(PARAM_ICE, 
kHandler.getTurnServers(c))
                                        .put("stream", sd.toJson()));
                }
-               return this;
        }
 
        public void addListener(final StreamProcessor processor, String sid, 
String uid, String sdpOffer) {
@@ -172,7 +202,7 @@ public class KStream extends AbstractStream {
                log.debug("gather candidates");
                endpoint.gatherCandidates(); // this one might throw Exception
                log.trace("USER {}: SdpAnswer is {}", this.uid, sdpAnswer);
-               processor.getHandler().sendClient(sid, newKurentoMsg()
+               kHandler.sendClient(sid, newKurentoMsg()
                                .put("id", "videoResponse")
                                .put("uid", this.uid)
                                .put("sdpAnswer", sdpAnswer));
@@ -215,11 +245,11 @@ public class KStream extends AbstractStream {
        }
 
        private WebRtcEndpoint createEndpoint(final StreamProcessor processor, 
String sid, String uid) {
-               WebRtcEndpoint endpoint = 
createWebRtcEndpoint(room.getPipeline());
+               WebRtcEndpoint endpoint = createWebRtcEndpoint(pipeline);
                endpoint.addTag("outUid", this.uid);
                endpoint.addTag("uid", uid);
 
-               endpoint.addIceCandidateFoundListener(evt -> 
processor.getHandler().sendClient(sid
+               endpoint.addIceCandidateFoundListener(evt -> 
kHandler.sendClient(sid
                                , newKurentoMsg()
                                        .put("id", "iceCandidate")
                                        .put("uid", KStream.this.uid)
@@ -235,7 +265,7 @@ public class KStream extends AbstractStream {
                        return;
                }
                final String chunkUid = "rec_" + room.getRecordingId() + "_" + 
randomUUID();
-               recorder = createRecorderEndpoint(room.getPipeline(), 
getRecUri(getRecordingChunk(room.getRoomId(), chunkUid)), profile);
+               recorder = createRecorderEndpoint(pipeline, 
getRecUri(getRecordingChunk(room.getRoomId(), chunkUid)), profile);
                recorder.addTag("outUid", uid);
                recorder.addTag("uid", uid);
 
@@ -335,6 +365,17 @@ public class KStream extends AbstractStream {
                                        log.warn("PARTICIPANT {}: Could not 
release", KStream.this.uid, cause);
                                }
                        });
+                       pipeline.release(new Continuation<Void>() {
+                               @Override
+                               public void onSuccess(Void result) throws 
Exception {
+                                       log.trace("PARTICIPANT {}: Released 
Pipeline", KStream.this.uid);
+                               }
+
+                               @Override
+                               public void onError(Throwable cause) throws 
Exception {
+                                       log.warn("PARTICIPANT {}: Could not 
release Pipeline", KStream.this.uid, cause);
+                               }
+                       });
                        releaseRecorder(false);
                        outgoingMedia = null;
                }
@@ -423,6 +464,10 @@ public class KStream extends AbstractStream {
                return room;
        }
 
+       MediaPipeline getPipeline() {
+               return pipeline;
+       }
+
        public StreamType getStreamType() {
                return streamType;
        }
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
index 5f1214e..a43f62c 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
@@ -56,6 +56,7 @@ import org.apache.openmeetings.db.manager.IClientManager;
 import org.apache.openmeetings.db.util.ws.RoomMessage;
 import org.apache.openmeetings.db.util.ws.TextRoomMessage;
 import org.apache.wicket.util.string.Strings;
+import org.kurento.client.Continuation;
 import org.kurento.client.Endpoint;
 import org.kurento.client.EventListener;
 import org.kurento.client.KurentoClient;
@@ -87,6 +88,7 @@ public class KurentoHandler {
        public static final String TAG_KUID = "kuid";
        public static final String TAG_MODE = "mode";
        public static final String TAG_ROOM = "roomId";
+       public static final String TAG_STREAM_UID = "streamUid";
        private static final String HMAC_SHA1_ALGORITHM = "HmacSHA1";
        private final ScheduledExecutorService kmsRecheckScheduler = 
Executors.newScheduledThreadPool(1);
        public static final String KURENTO_TYPE = "kurento";
@@ -296,19 +298,22 @@ public class KurentoHandler {
                streamProcessor.remove((Client)c);
        }
 
+       MediaPipeline createPipiline(Long roomId, String uid, 
Continuation<Void> continuation) {
+               Transaction t = beginTransaction();
+               MediaPipeline pipe = client.createMediaPipeline(t);
+               pipe.addTag(t, TAG_KUID, kuid);
+               pipe.addTag(t, TAG_ROOM, String.valueOf(roomId));
+               pipe.addTag(t, TAG_STREAM_UID, uid);
+               t.commit(continuation);
+               return pipe;
+       }
+
        KRoom getRoom(Long roomId) {
-               log.debug("Searching for room {}", roomId);
                KRoom room = rooms.computeIfAbsent(roomId, k -> {
                        log.debug("Room {} does not exist. Will create now!", 
roomId);
                        Room r = roomDao.get(roomId);
-                       Transaction t = beginTransaction();
-                       MediaPipeline pipe = client.createMediaPipeline(t);
-                       pipe.addTag(t, TAG_KUID, kuid);
-                       pipe.addTag(t, TAG_ROOM, String.valueOf(roomId));
-                       t.commit();
-                       return new KRoom(this, r, pipe);
+                       return new KRoom(this, r);
                });
-               log.debug("Room {} found!", roomId);
                return room;
        }
 
@@ -440,24 +445,30 @@ public class KurentoHandler {
                                        // still alive
                                        MediaPipeline pipe = 
client.getById(roid, MediaPipeline.class);
                                        Map<String, String> tags = 
tagsAsMap(pipe);
-                                       final String inKuid = 
tags.get(TAG_KUID);
-                                       if (ignoredKuids.contains(inKuid)) {
-                                               return;
-                                       }
-                                       if (validTestPipeline(tags)) {
-                                               return;
-                                       }
-                                       if (kuid.equals(inKuid)) {
-                                               KRoom r = 
rooms.get(Long.valueOf(tags.get(TAG_ROOM)));
-                                               if 
(r.getPipeline().getId().equals(pipe.getId())) {
+                                       try {
+                                               final String inKuid = 
tags.get(TAG_KUID);
+                                               if (inKuid != null && 
ignoredKuids.contains(inKuid)) {
                                                        return;
-                                               } else if (r != null) {
-                                                       
rooms.remove(r.getRoomId());
-                                                       r.close();
                                                }
+                                               if (validTestPipeline(tags)) {
+                                                       return;
+                                               }
+                                               if (kuid.equals(inKuid)) {
+                                                       KStream stream = 
streamProcessor.getByUid(tags.get(TAG_STREAM_UID));
+                                                       if (stream != null) {
+                                                               if 
(stream.getRoom().getRoomId().equals(Long.valueOf(tags.get(TAG_ROOM)))
+                                                                               
&& stream.getPipeline().getId().equals(pipe.getId()))
+                                                               {
+                                                                       return;
+                                                               } else {
+                                                                       
stream.release(streamProcessor);
+                                                               }
+                                                       }
+                                               }
+                                       } catch (Throwable e) {
+                                               log.warn("Invalid MediaPipeline 
{} detected, will be dropped, tags: {}", pipe.getId(), tags);
+                                               pipe.release();
                                        }
-                                       log.warn("Invalid MediaPipeline {} 
detected, will be dropped, tags: {}", pipe.getId(), tags);
-                                       pipe.release();
                                }, objCheckTimeout, MILLISECONDS);
                        } else if (evt.getObject() instanceof Endpoint) {
                                // endpoint created
@@ -478,22 +489,25 @@ public class KurentoHandler {
                                        }
                                        // still alive
                                        Endpoint point = client.getById(eoid, 
fClazz);
-                                       Map<String, String> pipeTags = 
tagsAsMap(point.getMediaPipeline());
-                                       final String inKuid = 
pipeTags.get(TAG_KUID);
-                                       if (ignoredKuids.contains(inKuid)) {
-                                               return;
-                                       }
-                                       if (validTestPipeline(pipeTags)) {
-                                               return;
-                                       }
                                        Map<String, String> tags = 
tagsAsMap(point);
-                                       KStream stream = 
streamProcessor.getByUid(tags.get("outUid"));
-                                       log.debug("New Endpoint {} detected, 
tags: {}, kStream: {}", point.getId(), tags, stream);
-                                       if (stream != null && 
stream.contains(tags.get("uid"))) {
-                                               return;
+                                       try {
+                                               Map<String, String> pipeTags = 
tagsAsMap(point.getMediaPipeline());
+                                               final String inKuid = 
pipeTags.get(TAG_KUID);
+                                               if 
(ignoredKuids.contains(inKuid)) {
+                                                       return;
+                                               }
+                                               if 
(validTestPipeline(pipeTags)) {
+                                                       return;
+                                               }
+                                               KStream stream = 
streamProcessor.getByUid(tags.get("outUid"));
+                                               
log.debug("Kurento::ObjectCreated -> New Endpoint {} detected, tags: {}, 
kStream: {}", point.getId(), tags, stream);
+                                               if (stream != null && 
stream.contains(tags.get("uid"))) {
+                                                       return;
+                                               }
+                                       } catch (Throwable e) {
+                                               
log.warn("Kurento::ObjectCreated -> Invalid Endpoint {} detected, will be 
dropped, tags: {}", point.getId(), tags);
+                                               point.release();
                                        }
-                                       log.warn("Invalid Endpoint {} detected, 
will be dropped, tags: {}", point.getId(), tags);
-                                       point.release();
                                }, objCheckTimeout, MILLISECONDS);
                        }
                }
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
index 27fbe41..5e4635a 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
@@ -32,7 +32,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.openmeetings.core.converter.IRecordingConverter;
 import org.apache.openmeetings.core.converter.InterviewConverter;
@@ -182,12 +182,13 @@ public class StreamProcessor implements IStreamProcessor {
                try {
                        if (sender == null) {
                                KRoom room = kHandler.getRoom(c.getRoomId());
-                               sender = room.join(sd);
-                       }
-                       startBroadcast(sender, sd, msg.getString("sdpOffer"));
-                       if (StreamType.SCREEN == sd.getType() && 
sd.hasActivity(Activity.RECORD) && !isRecording(c.getRoomId())) {
-                               startRecording(c);
+                               sender = room.join(sd, kHandler);
                        }
+                       startBroadcast(sender, sd, msg.getString("sdpOffer"), 
() -> {
+                               if (StreamType.SCREEN == sd.getType() && 
sd.hasActivity(Activity.RECORD) && !isRecording(c.getRoomId())) {
+                                       startRecording(c);
+                               }
+                       });
                } catch (KurentoServerException e) {
                        sender.release(this);
                        WebSocketHelper.sendClient(c, newStoppedMsg(sd));
@@ -203,10 +204,11 @@ public class StreamProcessor implements IStreamProcessor {
         * @param stream Stream to start
         * @param sd StreamDesc to start
         * @param sdpOffer the sdpOffer
+        * @param then steps need to be done after broadcast is started
         * @return the current KStream
         */
-       KStream startBroadcast(KStream stream, StreamDesc sd, String sdpOffer) {
-               return stream.startBroadcast(this, sd, sdpOffer);
+       void startBroadcast(KStream stream, StreamDesc sd, String sdpOffer, 
Runnable then) {
+               stream.startBroadcast(this, sd, sdpOffer, then);
        }
 
        private static boolean isBroadcasting(final Client c) {
@@ -501,7 +503,7 @@ public class StreamProcessor implements IStreamProcessor {
                        }
                }
                if (c.getRoomId() != null) {
-                       getByRoom(c.getRoomId()).stream().forEach(stream -> 
stream.remove(c)); // listeners of existing streams should be cleaned-up
+                       getByRoom(c.getRoomId()).forEach(stream -> 
stream.remove(c)); // listeners of existing streams should be cleaned-up
                        checkStreams(c.getRoomId());
                }
        }
@@ -514,10 +516,9 @@ public class StreamProcessor implements IStreamProcessor {
                return streamByUid.values();
        }
 
-       Collection<KStream> getByRoom(Long roomId) {
+       Stream<KStream> getByRoom(Long roomId) {
                return streamByUid.values().stream()
-                               .filter(stream -> stream.getRoom() != null && 
stream.getRoom().getRoomId().equals(roomId))
-                               .collect(Collectors.toList());
+                               .filter(stream -> stream.getRoom() != null && 
stream.getRoom().getRoomId().equals(roomId));
        }
 
        Client getBySid(String sid) {
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipManager.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipManager.java
index 4e0f991..715718a 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipManager.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipManager.java
@@ -280,7 +280,7 @@ public class SipManager implements ISipManager, 
SipListenerExt {
                ConfbridgeListAction da = new ConfbridgeListAction(confno);
                ResponseEvents r = execEvent(da);
                if (r != null) {
-                       log.debug("SipManager::countUsers size == {}", 
r.getEvents().size());
+                       log.trace("SipManager::countUsers size == {}", 
r.getEvents().size());
                        // "- 1" here means: ListComplete event
                        return r.getEvents().size() - 1;
                }
diff --git 
a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java
 
b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java
index cf24bc3..c5d5b1e 100644
--- 
a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java
+++ 
b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java
@@ -91,7 +91,7 @@ class TestRecordingFlowMocked extends BaseMockedTest {
                doReturn(c.getRoom()).when(roomDao).get(ROOM_ID);
 
                // Mock out the methods that do webRTC
-               doReturn(null).when(streamProcessor).startBroadcast(any(), 
any(), any());
+               doReturn(null).when(streamProcessor).startBroadcast(any(), 
any(), any(), any());
 
        }
 
@@ -175,7 +175,7 @@ class TestRecordingFlowMocked extends BaseMockedTest {
                assertTrue(streamProcessor.isSharing(ROOM_ID));
 
                //verify startBroadcast has been invoked
-               verify(streamProcessor).startBroadcast(any(), any(), any());
+               verify(streamProcessor).startBroadcast(any(), any(), any(), 
any());
 
                // Assert that there is still just 1 stream and has only the 
activities to Record assigned
                assertEquals(1, c.getStreams().size());
diff --git 
a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/ClientManager.java
 
b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/ClientManager.java
index 4dfedcc..e4f38f0 100644
--- 
a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/ClientManager.java
+++ 
b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/ClientManager.java
@@ -313,7 +313,7 @@ public class ClientManager implements IClientManager {
                        .map(id -> onlineRooms.getOrDefault(id, Set.of()))
                        .stream()
                        .flatMap(Set::stream)
-                       .map(uid -> get(uid))
+                       .map(this::get)
                        .filter(Objects::nonNull);
        }
 
@@ -322,10 +322,8 @@ public class ClientManager implements IClientManager {
                        .map(id -> onlineRooms.getOrDefault(id, Set.of()))
                        .stream()
                        .flatMap(Set::stream)
-                       .map(uid -> get(uid))
-                       .filter(c -> c != null && c.sameUserId(userId))
-                       .findAny()
-                       .isPresent();
+                       .map(this::get)
+                       .anyMatch(c -> c != null && c.sameUserId(userId));
        }
 
        private List<Client> getByKeys(Long userId, String sessionId) {
diff --git 
a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/TimerService.java
 
b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/TimerService.java
index 799b5a1..2a2800b 100644
--- 
a/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/TimerService.java
+++ 
b/openmeetings-web/src/main/java/org/apache/openmeetings/web/app/TimerService.java
@@ -82,7 +82,7 @@ public class TimerService {
                sipCheckMap.put(
                                roomId
                                , new CompletableFuture<>().completeAsync(() -> 
{
-                                       log.warn("Sip room check {}", roomId);
+                                       log.trace("Sip room check {}", roomId);
                                        Optional<Client> sipClient = 
cm.streamByRoom(roomId).filter(Client::isSip).findAny();
                                        
cm.streamByRoom(roomId).filter(Predicate.not(Client::isSip)).findAny().ifPresentOrElse(c
 -> {
                                                updateSipLastName(sipClient, 
c.getRoom());
diff --git 
a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
 
b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
index 4a4ae17..e0f9780 100644
--- 
a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
+++ 
b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
@@ -619,8 +619,7 @@ public class RoomPanel extends BasePanel {
                                handler.appendJavaScript("if (typeof(WbArea) 
=== 'object') {WbArea.setRecStarted(true);}");
                        } else if 
(streamProcessor.recordingAllowed(getClient())) {
                                boolean hasStreams = cm.streamByRoom(r.getId())
-                                               .filter(cl -> 
!cl.getStreams().isEmpty())
-                                               .findAny().isPresent();
+                                               .anyMatch(cl -> 
!cl.getStreams().isEmpty());
                                handler.appendJavaScript(String.format("if 
(typeof(WbArea) === 'object') 
{WbArea.setRecStarted(false);WbArea.setRecEnabled(%s);}", hasStreams));
                        }
                }
@@ -636,8 +635,7 @@ public class RoomPanel extends BasePanel {
 
        public static boolean hasRight(ClientManager cm, long userId, long 
roomId, Right r) {
                return cm.streamByRoom(roomId)
-                               .filter(c -> c.sameUserId(userId) && 
c.hasRight(r))
-                               .findAny().isPresent();
+                               .anyMatch(c -> c.sameUserId(userId) && 
c.hasRight(r));
        }
 
        @Override
diff --git 
a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js 
b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js
index db083a4..215b114 100644
--- 
a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js
+++ 
b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video.js
@@ -6,6 +6,10 @@ var Video = (function() {
                , lm, level, userSpeaks = false, muteOthers
                , hasVideo, isSharing, isRecording;
 
+       function __getVideo(_state) {
+               const vid = self.video(_state);
+               return vid && vid.length > 0 ? vid[0] : null;
+       }
        function _resizeDlgArea(_w, _h) {
                if (Room.getOptions().interview) {
                        VideoUtil.setPos(v, VideoUtil.getPos());
@@ -153,7 +157,7 @@ var Video = (function() {
                        , onicecandidate: self.onIceCandidate
                };
                if (!isSharing) {
-                       state.options.localVideo = state.video[0];
+                       state.options.localVideo = __getVideo(state);
                }
                const data = state.data;
                data.rtcPeer = new kurentoUtils.WebRtcPeer.WebRtcPeerSendonly(
@@ -203,7 +207,7 @@ var Video = (function() {
        function _createResvPeer(msg, state) {
                __createVideo(state);
                const options = VideoUtil.addIceServers({
-                       remoteVideo : state.video[0]
+                       remoteVideo : __getVideo(state)
                        , onicecandidate : self.onIceCandidate
                }, msg);
                const data = state.data;
@@ -418,8 +422,9 @@ var Video = (function() {
                }
                state.video = $(hasVideo ? '<video>' : '<audio>').attr('id', 
'vid' + _id)
                        .attr('playsinline', 'playsinline')
-                       .width(vc.width()).height(vc.height())
-                       .prop('autoplay', true).prop('controls', false);
+                       //.attr('autoplay', 'autoplay')
+                       .prop('controls', false)
+                       .width(vc.width()).height(vc.height());
                if (state.data) {
                        state.video.data(state.data);
                }
@@ -568,20 +573,22 @@ var Video = (function() {
                        return;
                }
                state.data.rtcPeer.processAnswer(answer, function (error) {
-                       if (true === this.cleaned || 
this.peerConnection.signalingState === 'stable') {
+                       if (true === this.cleaned) {
                                return;
                        }
-                       if (error) {
-                               return OmUtil.error(error);
-                       }
-                       if (state.video && state.video.paused) {
-                               state.video.play().catch(function (err) {
+                       const video = __getVideo(state);
+                       if (this.peerConnection.signalingState === 'stable' && 
video && video.paused) {
+                               video.play().catch(function (err) {
                                        if ('NotAllowedError' === err.name) {
                                                
VideoUtil.askPermission(function () {
-                                                       state.video.play();
+                                                       video.play();
                                                });
                                        }
                                });
+                               return;
+                       }
+                       if (error) {
+                               return OmUtil.error(error);
                        }
                });
        }
@@ -625,8 +632,8 @@ var Video = (function() {
                });
        };
        self.reattachStream = _reattachStream;
-       self.video = function() {
-               const state = states.length > 0 ? states[0] : null;
+       self.video = function(_state) {
+               const state = _state || (states.length > 0 ? states[0] : null);
                if (!state || state.disposed) {
                        return null;
                }
diff --git 
a/openmeetings-web/src/test/java/org/apache/openmeetings/web/app/TestApplication.java
 
b/openmeetings-web/src/test/java/org/apache/openmeetings/web/app/TestApplication.java
index 415d78c..985d45b 100644
--- 
a/openmeetings-web/src/test/java/org/apache/openmeetings/web/app/TestApplication.java
+++ 
b/openmeetings-web/src/test/java/org/apache/openmeetings/web/app/TestApplication.java
@@ -33,7 +33,7 @@ import org.apache.openmeetings.AbstractJUnitDefaults;
 import org.apache.openmeetings.db.dao.label.LabelDao;
 import org.junit.jupiter.api.Test;
 
-public class TestApplication extends AbstractJUnitDefaults {
+class TestApplication extends AbstractJUnitDefaults {
        @Test
        void testMissing() {
                assertEquals("[Missing]", app.getOmString("909", 
Locale.ENGLISH));

Reply via email to