This is an automated email from the ASF dual-hosted git repository. sebawagner pushed a commit to branch feature/OPENMEETINGS-2585-StreamProcessor-KurentoHandler-simplify in repository https://gitbox.apache.org/repos/asf/openmeetings.git
commit dee4532aa3f5df32705fadda8d5d1b93a4e37612 Author: Sebastian Wagner <[email protected]> AuthorDate: Fri Mar 5 16:06:39 2021 +1300 OPENMEETINGS-2585 Update StreamProcessor and KurentoHandler - moving methods out that are not related to stream or Kurento Handling. --- .../org/apache/openmeetings/core/remote/KRoom.java | 52 +++++++----- .../apache/openmeetings/core/remote/KStream.java | 28 ++++--- .../openmeetings/core/remote/KTestStream.java | 3 +- .../openmeetings/core/remote/KurentoHandler.java | 74 +++++------------ .../openmeetings/core/remote/KurentoUtil.java | 59 ++++++++++++++ .../openmeetings/core/remote/StreamProcessor.java | 93 ++++------------------ .../core/remote/StreamProcessorActions.java | 5 +- .../core/remote/TestStreamProcessor.java | 2 +- .../core/remote/TestRecordingFlowMocked.java | 6 +- .../core/remote/TestRoomFlowMocked.java | 2 +- .../openmeetings/db/entity/basic/Client.java | 37 +++++++++ .../openmeetings/web/common/OmWebSocketPanel.java | 2 +- .../apache/openmeetings/web/room/RoomPanel.java | 7 +- 13 files changed, 195 insertions(+), 175 deletions(-) diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java index 3de644f..dbbe27f 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java @@ -20,7 +20,6 @@ package org.apache.openmeetings.core.remote; import static java.util.UUID.randomUUID; import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE; -import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg; import static org.apache.openmeetings.db.util.ApplicationHelper.ensureApplication; import java.util.Date; @@ -29,8 +28,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.time.FastDateFormat; import org.apache.openmeetings.IApplication; +import org.apache.openmeetings.core.sip.SipManager; import org.apache.openmeetings.core.util.WebSocketHelper; import org.apache.openmeetings.db.dao.record.RecordingChunkDao; +import org.apache.openmeetings.db.dao.record.RecordingDao; import org.apache.openmeetings.db.entity.basic.Client; import org.apache.openmeetings.db.entity.basic.Client.Activity; import org.apache.openmeetings.db.entity.basic.Client.StreamDesc; @@ -55,8 +56,12 @@ import com.github.openjson.JSONObject; public class KRoom { private static final Logger log = LoggerFactory.getLogger(KRoom.class); + private final KurentoHandler kHandler; private final StreamProcessor processor; private final RecordingChunkDao chunkDao; + private final SipManager sipManager; + private final IClientManager cm; + private final RecordingDao recDao; private final Room room; private final AtomicBoolean recordingStarted = new AtomicBoolean(false); private final AtomicBoolean sharingStarted = new AtomicBoolean(false); @@ -65,9 +70,14 @@ public class KRoom { private JSONObject recordingUser = new JSONObject(); private JSONObject sharingUser = new JSONObject(); - public KRoom(KurentoHandler handler, Room r) { - this.processor = handler.getStreamProcessor(); - this.chunkDao = handler.getChunkDao(); + public KRoom(KurentoHandler handler, Room r, IClientManager cm, RecordingDao recDao, + RecordingChunkDao chunkDao, StreamProcessor streamProcessor, SipManager sipManager) { + this.kHandler = handler; + this.processor = streamProcessor; + this.chunkDao = chunkDao; + this.recDao = recDao; + this.sipManager = sipManager; + this.cm = cm; this.room = r; log.info("ROOM {} has been created", room.getId()); } @@ -86,14 +96,14 @@ public class KRoom { public KStream join(final StreamDesc sd, KurentoHandler kHandler) { log.info("ROOM {}: join client {}, stream: {}", room.getId(), sd.getClient(), sd.getUid()); - final KStream stream = new KStream(sd, this, kHandler); + final KStream stream = new KStream(sd, this, kHandler, cm, processor, sipManager); processor.addStream(stream); return stream; } public void onStopBroadcast(KStream stream) { processor.release(stream, true); - WebSocketHelper.sendAll(newKurentoMsg() + WebSocketHelper.sendAll(KurentoUtil.newKurentoMsg() .put("id", "broadcastStopped") .put("uid", stream.getUid()) .toString() @@ -144,11 +154,11 @@ public class KRoom { Optional<StreamDesc> osd = c.getScreenStream(); if (osd.isPresent()) { osd.get().addActivity(Activity.RECORD); - processor.getClientManager().update(c); + cm.update(c); rec.setWidth(osd.get().getWidth()); rec.setHeight(osd.get().getHeight()); } - rec = processor.getRecordingDao().update(rec); + rec = recDao.update(rec); // Receive recordingId recordingId = rec.getId(); processor.getByRoom(room.getId()).forEach(KStream::startRecord); @@ -163,9 +173,9 @@ public class KRoom { if (recordingStarted.compareAndSet(true, false)) { log.debug("##REC:: recording in room {} is stopping {} ::", room.getId(), recordingId); processor.getByRoom(room.getId()).forEach(KStream::stopRecord); - Recording rec = processor.getRecordingDao().get(recordingId); + Recording rec = recDao.get(recordingId); rec.setRecordEnd(new Date()); - rec = processor.getRecordingDao().update(rec); + rec = recDao.update(rec); recordingUser = new JSONObject(); recordingId = null; @@ -178,8 +188,8 @@ public class KRoom { Optional<StreamDesc> osd = c.getScreenStream(); if (osd.isPresent()) { osd.get().removeActivity(Activity.RECORD); - processor.getClientManager().update(c); - processor.getHandler().sendShareUpdated(osd.get()); + cm.update(c); + kHandler.sendShareUpdated(osd.get()); } } // Send notification to all users that the recording has been started @@ -205,27 +215,26 @@ public class KRoom { public void startSharing(StreamProcessor processor, IClientManager cm, Client c, Optional<StreamDesc> osd, JSONObject msg, Activity a) { StreamDesc sd; - KurentoHandler h = processor.getHandler(); if (sharingStarted.compareAndSet(false, true)) { sharingUser.put("sid", c.getSid()); sd = c.addStream(StreamType.SCREEN, a); cm.update(c); log.debug("Stream.UID {}: sharing has been started, activity: {}", sd.getUid(), a); - h.sendClient(sd.getSid(), newKurentoMsg() + kHandler.sendClient(sd.getSid(), KurentoUtil.newKurentoMsg() .put("id", "broadcast") .put("stream", sd.toJson() .put("shareType", msg.getString("shareType")) .put("fps", msg.getString("fps"))) - .put(PARAM_ICE, h.getTurnServers(c))); + .put(PARAM_ICE, kHandler.getTurnServers(c))); } else if (osd.isPresent() && !osd.get().hasActivity(a)) { sd = osd.get(); sd.addActivity(a); cm.update(c); - h.sendShareUpdated(sd); + kHandler.sendShareUpdated(sd); WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.RIGHT_UPDATED, c.getUid())); - WebSocketHelper.sendRoomOthers(room.getId(), c.getUid(), newKurentoMsg() + WebSocketHelper.sendRoomOthers(room.getId(), c.getUid(), KurentoUtil.newKurentoMsg() .put("id", "newStream") - .put(PARAM_ICE, processor.getHandler().getTurnServers(c)) + .put(PARAM_ICE, kHandler.getTurnServers(c)) .put("stream", sd.toJson())); } } @@ -245,17 +254,16 @@ public class KRoom { if (count != sipCount) { processor.getByRoom(room.getId()).forEach(stream -> stream.addSipProcessor(count)); if (sipCount == 0) { - processor.getClientManager() - .streamByRoom(room.getId()) + cm.streamByRoom(room.getId()) .filter(Client::isSip) .findAny() .ifPresent(c -> { StreamDesc sd = c.addStream(StreamType.WEBCAM, Activity.AUDIO); sd.setWidth(120).setHeight(90); c.restoreActivities(sd); - KStream stream = join(sd, processor.getHandler()); + KStream stream = join(sd, kHandler); stream.startBroadcast(sd, "", () -> {}); - processor.getClientManager().update(c); + cm.update(c); }); } sipCount = count; diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java index aaef0ca..a801d8f 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java @@ -28,7 +28,6 @@ import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE; import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_ROOM; import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_STREAM_UID; import static org.apache.openmeetings.core.remote.KurentoHandler.getFlowoutTimeout; -import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg; import static org.apache.openmeetings.util.OmFileHelper.getRecUri; import static org.apache.openmeetings.util.OmFileHelper.getRecordingChunk; @@ -44,6 +43,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.apache.openmeetings.core.sip.ISipCallbacks; +import org.apache.openmeetings.core.sip.SipManager; import org.apache.openmeetings.core.sip.SipStackProcessor; import org.apache.openmeetings.core.util.WebSocketHelper; import org.apache.openmeetings.db.entity.basic.Client; @@ -51,6 +51,7 @@ import org.apache.openmeetings.db.entity.basic.Client.Activity; import org.apache.openmeetings.db.entity.basic.Client.StreamDesc; import org.apache.openmeetings.db.entity.basic.Client.StreamType; import org.apache.openmeetings.db.entity.record.RecordingChunk.Type; +import org.apache.openmeetings.db.manager.IClientManager; import org.apache.openmeetings.db.util.ws.RoomMessage; import org.apache.openmeetings.db.util.ws.TextRoomMessage; import org.apache.openmeetings.util.OmFileHelper; @@ -76,9 +77,12 @@ public class KStream extends AbstractStream implements ISipCallbacks { private static final Logger log = LoggerFactory.getLogger(KStream.class); private final KurentoHandler kHandler; + private final StreamProcessor streamProcessor; + private final SipManager sipManager; private final KRoom kRoom; private final Date connectedSince; private final StreamType streamType; + private final IClientManager cm; private MediaProfileSpecType profile; private MediaPipeline pipeline; private RecorderEndpoint recorder; @@ -96,12 +100,16 @@ public class KStream extends AbstractStream implements ISipCallbacks { private boolean hasScreen; private boolean sipClient; - public KStream(final StreamDesc sd, KRoom kRoom, KurentoHandler kHandler) { + public KStream(final StreamDesc sd, KRoom kRoom, KurentoHandler kHandler, + IClientManager cm, StreamProcessor streamProcessor, SipManager sipManager) { super(sd.getSid(), sd.getUid()); this.kRoom = kRoom; + this.streamProcessor = streamProcessor; + this.sipManager = sipManager; streamType = sd.getType(); this.connectedSince = new Date(); this.kHandler = kHandler; + this.cm = cm; //TODO Min/MaxVideoSendBandwidth //TODO Min/Max Audio/Video RecvBandwidth } @@ -189,7 +197,7 @@ public class KStream extends AbstractStream implements ISipCallbacks { flowoutFuture = Optional.of(new CompletableFuture<>().completeAsync(() -> { log.warn("KStream will be dropped {}, sid {}, uid {}", sd, sid, uid); if (StreamType.SCREEN == streamType) { - kHandler.getStreamProcessor().doStopSharing(sid, uid); + streamProcessor.doStopSharing(sid, uid); } stopBroadcast(); return null; @@ -213,7 +221,7 @@ public class KStream extends AbstractStream implements ISipCallbacks { Client c = sd.getClient(); WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.RIGHT_UPDATED, c.getUid())); if (hasAudio || hasVideo || hasScreen) { - WebSocketHelper.sendRoomOthers(getRoomId(), c.getUid(), newKurentoMsg() + WebSocketHelper.sendRoomOthers(getRoomId(), c.getUid(), KurentoUtil.newKurentoMsg() .put("id", "newStream") .put(PARAM_ICE, kHandler.getTurnServers(c)) .put("stream", sd.toJson())); @@ -252,7 +260,7 @@ public class KStream extends AbstractStream implements ISipCallbacks { ((WebRtcEndpoint)endpoint).gatherCandidates(); // this one might throw Exception } log.trace("USER {}: SdpAnswer is {}", this.uid, sdpAnswer); - kHandler.sendClient(sid, newKurentoMsg() + kHandler.sendClient(sid, KurentoUtil.newKurentoMsg() .put("id", "videoResponse") .put("uid", this.uid) .put("sdpAnswer", sdpAnswer)); @@ -275,7 +283,7 @@ public class KStream extends AbstractStream implements ISipCallbacks { listeners.put(uid, listener); log.debug("PARTICIPANT {}: obtained endpoint for {}", uid, this.uid); - Client cur = kHandler.getStreamProcessor().getBySid(this.sid); + Client cur = cm.getBySid(this.sid); if (cur == null) { log.warn("Client for endpoint dooesn't exists"); } else { @@ -311,7 +319,7 @@ public class KStream extends AbstractStream implements ISipCallbacks { reApplyIceCandiates(endpoint, recv); endpoint.addIceCandidateFoundListener(evt -> kHandler.sendClient(sid - , newKurentoMsg() + , KurentoUtil.newKurentoMsg() .put("id", "iceCandidate") .put("uid", KStream.this.uid) .put(PARAM_CANDIDATE, convert(JsonUtils.toJsonObject(evt.getCandidate())))) @@ -450,7 +458,7 @@ public class KStream extends AbstractStream implements ISipCallbacks { outgoingMedia = null; } if (remove) { - kHandler.getStreamProcessor().release(this, false); + streamProcessor.release(this, false); } } @@ -588,7 +596,7 @@ public class KStream extends AbstractStream implements ISipCallbacks { if (count > 0) { if (sipProcessor.isEmpty()) { try { - sipProcessor = kHandler.getSipManager().createSipStackProcessor( + sipProcessor = sipManager.createSipStackProcessor( randomUUID().toString() , kRoom.getRoom() , this); @@ -626,7 +634,7 @@ public class KStream extends AbstractStream implements ISipCallbacks { answerConsumer.accept(answer); log.debug(answer); if (sipClient) { - StreamDesc sd = kHandler.getStreamProcessor().getBySid(sid).getStream(uid); + StreamDesc sd = cm.getBySid(sid).getStream(uid); try { outgoingMedia = rtpEndpoint; internalStartBroadcast(sd, sdp); diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java index 741c1f6..fc8ef9a 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java @@ -23,7 +23,6 @@ import static org.apache.openmeetings.core.remote.KurentoHandler.MODE_TEST; import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE; import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_MODE; import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_ROOM; -import static org.apache.openmeetings.core.remote.KurentoHandler.sendError; import static org.apache.openmeetings.core.remote.TestStreamProcessor.newTestKurentoMsg; import static org.apache.openmeetings.util.OmFileHelper.EXTENSION_WEBM; import static org.apache.openmeetings.util.OmFileHelper.TEST_SETUP_PREFIX; @@ -126,7 +125,7 @@ public class KTestStream extends AbstractStream { @Override public void onError(Throwable cause) throws Exception { - sendError(c, "Failed to start recording"); + KurentoUtil.sendError(c, "Failed to start recording"); log.error("Failed to start recording", cause); } }); diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java index 1e8ebb4..e8ebad1 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java @@ -20,6 +20,9 @@ package org.apache.openmeetings.core.remote; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.openmeetings.core.remote.KurentoUtil.newKurentoMsg; +import static org.apache.openmeetings.core.remote.KurentoUtil.sendError; +import static org.apache.openmeetings.core.remote.KurentoUtil.tagsAsMap; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; @@ -34,7 +37,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -44,13 +46,12 @@ import javax.crypto.spec.SecretKeySpec; import org.apache.openmeetings.core.sip.SipManager; import org.apache.openmeetings.core.util.WebSocketHelper; import org.apache.openmeetings.db.dao.record.RecordingChunkDao; +import org.apache.openmeetings.db.dao.record.RecordingDao; import org.apache.openmeetings.db.dao.room.RoomDao; import org.apache.openmeetings.db.entity.basic.Client; -import org.apache.openmeetings.db.entity.basic.Client.Activity; import org.apache.openmeetings.db.entity.basic.Client.StreamDesc; import org.apache.openmeetings.db.entity.basic.IWsClient; import org.apache.openmeetings.db.entity.room.Room; -import org.apache.openmeetings.db.entity.room.Room.Right; import org.apache.openmeetings.db.entity.user.User; import org.apache.openmeetings.db.manager.IClientManager; import org.apache.openmeetings.db.util.ws.RoomMessage; @@ -60,13 +61,11 @@ import org.kurento.client.Continuation; import org.kurento.client.Endpoint; import org.kurento.client.EventListener; import org.kurento.client.KurentoClient; -import org.kurento.client.MediaObject; import org.kurento.client.MediaPipeline; import org.kurento.client.ObjectCreatedEvent; import org.kurento.client.PlayerEndpoint; import org.kurento.client.RecorderEndpoint; import org.kurento.client.RtpEndpoint; -import org.kurento.client.Tag; import org.kurento.client.Transaction; import org.kurento.client.WebRtcEndpoint; import org.kurento.jsonrpc.client.JsonRpcClientNettyWebSocket; @@ -92,7 +91,6 @@ public class KurentoHandler { public static final String TAG_STREAM_UID = "streamUid"; private static final String HMAC_SHA1_ALGORITHM = "HmacSHA1"; private final ScheduledExecutorService kmsRecheckScheduler = Executors.newScheduledThreadPool(1); - public static final String KURENTO_TYPE = "kurento"; private static int flowoutTimeout = 5; @Value("${kurento.ws.url}") private String kurentoWsUrl; @@ -127,6 +125,8 @@ public class KurentoHandler { @Autowired private RecordingChunkDao chunkDao; @Autowired + private RecordingDao recDao; + @Autowired private TestStreamProcessor testProcessor; @Autowired private StreamProcessor streamProcessor; @@ -141,6 +141,21 @@ public class KurentoHandler { return connctd; } + public boolean isSharing(Long roomId) { + if (!isConnected()) { + return false; + } + return getRoom(roomId).isSharing(); + } + + public boolean screenShareAllowed(Client c) { + if (!isConnected()) { + return false; + } + Room r = c.getRoom(); + return c.hasRightsToShare() && !isSharing(r.getId()); + } + @PostConstruct public void init() { check = () -> { @@ -224,11 +239,6 @@ public class KurentoHandler { } } - private static Map<String, String> tagsAsMap(MediaObject pipe) { - return pipe.getTags().stream() - .collect(Collectors.toMap(Tag::getKey, Tag::getValue)); - } - Transaction beginTransaction() { return client.beginTransaction(); } @@ -279,12 +289,6 @@ public class KurentoHandler { WebSocketHelper.sendClient(cm.getBySid(sid), msg); } - public static void sendError(IWsClient c, String msg) { - WebSocketHelper.sendClient(c, newKurentoMsg() - .put("id", "error") - .put("message", msg)); - } - public void remove(IWsClient c) { if (!isConnected() || c == null) { return; @@ -309,7 +313,7 @@ public class KurentoHandler { return rooms.computeIfAbsent(roomId, k -> { log.debug("Room {} does not exist. Will create now!", roomId); Room r = roomDao.get(roomId); - return new KRoom(this, r); + return new KRoom(this, r, cm, recDao, chunkDao, streamProcessor, sipManager); }); } @@ -321,28 +325,6 @@ public class KurentoHandler { getRoom(r.getId()).updateSipCount(count); } - static JSONObject newKurentoMsg() { - return new JSONObject().put("type", KURENTO_TYPE); - } - - public static boolean activityAllowed(Client c, Activity a, Room room) { - boolean r = false; - switch (a) { - case AUDIO: - r = c.hasRight(Right.AUDIO); - break; - case VIDEO: - r = !room.isAudioOnly() && c.hasRight(Right.VIDEO); - break; - case AUDIO_VIDEO: - r = !room.isAudioOnly() && c.hasRight(Right.AUDIO) && c.hasRight(Right.VIDEO); - break; - default: - break; - } - return r; - } - public JSONArray getTurnServers(Client c) { return getTurnServers(c, false); } @@ -401,18 +383,6 @@ public class KurentoHandler { return testProcessor; } - StreamProcessor getStreamProcessor() { - return streamProcessor; - } - - SipManager getSipManager() { - return sipManager; - } - - RecordingChunkDao getChunkDao() { - return chunkDao; - } - static int getFlowoutTimeout() { return flowoutTimeout; } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoUtil.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoUtil.java new file mode 100644 index 0000000..6557c10 --- /dev/null +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoUtil.java @@ -0,0 +1,59 @@ +/* + * (C) Copyright 2014 Kurento (http://kurento.org/) + */ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License") + you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.openmeetings.core.remote; + +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.openmeetings.core.util.WebSocketHelper; +import org.apache.openmeetings.db.entity.basic.IWsClient; +import org.apache.openmeetings.db.entity.basic.Client.StreamDesc; +import org.kurento.client.MediaObject; +import org.kurento.client.Tag; + +import com.github.openjson.JSONObject; + +public class KurentoUtil { + + public static final String KURENTO_TYPE = "kurento"; + + static JSONObject newKurentoMsg() { + return new JSONObject().put("type", KURENTO_TYPE); + } + + static JSONObject newStoppedMsg(StreamDesc sd) { + return newKurentoMsg() + .put("id", "broadcastStopped") + .put("uid", sd.getUid()); + } + + static Map<String, String> tagsAsMap(MediaObject pipe) { + return pipe.getTags().stream() + .collect(Collectors.toMap(Tag::getKey, Tag::getValue)); + } + + public static void sendError(IWsClient c, String msg) { + WebSocketHelper.sendClient(c, newKurentoMsg() + .put("id", "error") + .put("message", msg)); + } +} diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java index 3771c0c..bbac89a 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java @@ -20,8 +20,6 @@ package org.apache.openmeetings.core.remote; import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE; -import static org.apache.openmeetings.core.remote.KurentoHandler.activityAllowed; -import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg; import static org.apache.openmeetings.util.OpenmeetingsVariables.isRecordingsEnabled; import java.util.Collection; @@ -37,15 +35,12 @@ import org.apache.openmeetings.core.converter.IRecordingConverter; import org.apache.openmeetings.core.converter.InterviewConverter; import org.apache.openmeetings.core.converter.RecordingConverter; import org.apache.openmeetings.core.util.WebSocketHelper; -import org.apache.openmeetings.db.dao.record.RecordingDao; import org.apache.openmeetings.db.entity.basic.Client; import org.apache.openmeetings.db.entity.basic.Client.Activity; import org.apache.openmeetings.db.entity.basic.Client.StreamDesc; import org.apache.openmeetings.db.entity.basic.Client.StreamType; import org.apache.openmeetings.db.entity.record.Recording; import org.apache.openmeetings.db.entity.room.Room; -import org.apache.openmeetings.db.entity.room.Room.Right; -import org.apache.openmeetings.db.entity.room.Room.RoomElement; import org.apache.openmeetings.db.manager.IClientManager; import org.apache.openmeetings.db.util.ws.RoomMessage; import org.apache.openmeetings.db.util.ws.TextRoomMessage; @@ -70,8 +65,6 @@ public class StreamProcessor implements IStreamProcessor { @Autowired private IClientManager cm; @Autowired - private RecordingDao recDao; - @Autowired private KurentoHandler kHandler; @Autowired private TaskExecutor taskExecutor; @@ -119,7 +112,7 @@ public class StreamProcessor implements IStreamProcessor { break; case "wannaShare": osd = c.getScreenStream(); - if (screenShareAllowed(c) || (osd.isPresent() && !osd.get().hasActivity(Activity.SCREEN))) { + if (kHandler.screenShareAllowed(c) || (osd.isPresent() && !osd.get().hasActivity(Activity.SCREEN))) { startSharing(c, osd, msg, Activity.SCREEN); } break; @@ -131,7 +124,7 @@ public class StreamProcessor implements IStreamProcessor { log.warn("This shouldn't be called for interview room"); break; } - boolean sharing = isSharing(r.getId()); + boolean sharing = kHandler.isSharing(r.getId()); startSharing(c, osd, msg, Activity.RECORD); if (sharing) { startRecording(c); @@ -167,10 +160,6 @@ public class StreamProcessor implements IStreamProcessor { stream.startBroadcast(sd, sdpOffer, then); } - private static boolean isBroadcasting(final Client c) { - return c.hasAnyActivity(Activity.AUDIO, Activity.VIDEO); - } - private Set<String> cleanWebCams(Client c, List<StreamDesc> streams) { Set<String> closed = new HashSet<>(); streams.stream() @@ -193,8 +182,8 @@ public class StreamProcessor implements IStreamProcessor { return; } - if (activityAllowed(c, a, c.getRoom())) { - boolean wasBroadcasting = isBroadcasting(c); + if (c.activityAllowed(a)) { + boolean wasBroadcasting = c.isBroadcasting(); if (a == Activity.AUDIO && !c.isMicEnabled()) { return; } @@ -206,7 +195,7 @@ public class StreamProcessor implements IStreamProcessor { } c.toggle(a); List<StreamDesc> streams = c.getStreams(); - if (!isBroadcasting(c)) { + if (!c.isBroadcasting()) { Set<String> closed = cleanWebCams(c, streams); if (!closed.isEmpty()) { cm.update(c); @@ -218,7 +207,7 @@ public class StreamProcessor implements IStreamProcessor { Set<String> closed = wasBroadcasting ? cleanWebCams(c, streams) : Set.of(); cm.update(c.restoreActivities(sd)); log.debug("User {}: has started broadcast", sd.getUid()); - kHandler.sendClient(sd.getSid(), newKurentoMsg() + kHandler.sendClient(sd.getSid(), KurentoUtil.newKurentoMsg() .put("id", "broadcast") .put("stream", sd.toJson(true)) .put("cleanup", new JSONArray(closed)) @@ -240,10 +229,10 @@ public class StreamProcessor implements IStreamProcessor { public void rightsUpdated(Client c) { Optional<StreamDesc> osd = c.getScreenStream(); - if (osd.isPresent() && !hasRightsToShare(c)) { + if (osd.isPresent() && !c.hasRightsToShare()) { stopSharing(c, osd.get().getUid()); } - if (isBroadcasting(c)) { + if (c.isBroadcasting()) { constraintsChanged(c); } else { c.getStreams().stream() @@ -288,20 +277,6 @@ public class StreamProcessor implements IStreamProcessor { } // Sharing - public boolean hasRightsToShare(Client c) { - if (!kHandler.isConnected()) { - return false; - } - Room r = c.getRoom(); - return r != null && Room.Type.INTERVIEW != r.getType() - && !r.isHidden(RoomElement.SCREEN_SHARING) - && c.hasRight(Right.SHARE); - } - - public boolean screenShareAllowed(Client c) { - Room r = c.getRoom(); - return hasRightsToShare(c) && !isSharing(r.getId()); - } private void errorSharing(Client c) { if (!kHandler.isConnected()) { @@ -329,16 +304,16 @@ public class StreamProcessor implements IStreamProcessor { /** * Execute Pausing of sharing. * - * Invoked and overwritten by Mock, hance package private. + * Invoked and overwritten by Mock, hence package private. * * @param c client * @param uid the uid */ void pauseSharing(Client c, String uid) { - if (!hasRightsToShare(c)) { + if (!c.hasRightsToShare()) { return; } - if (!isSharing(c.getRoomId())) { + if (!kHandler.isSharing(c.getRoomId())) { return; } if (isRecording(c.getRoomId())) { @@ -348,7 +323,7 @@ public class StreamProcessor implements IStreamProcessor { KStream sender = getByUid(uid); sender.pauseSharing(); kHandler.sendShareUpdated(sd); - WebSocketHelper.sendRoomOthers(c.getRoomId(), c.getUid(), newStoppedMsg(sd)); + WebSocketHelper.sendRoomOthers(c.getRoomId(), c.getUid(), KurentoUtil.newStoppedMsg(sd)); } else { stopSharing(c, uid); } @@ -365,7 +340,7 @@ public class StreamProcessor implements IStreamProcessor { } StreamDesc doStopSharing(String sid, String uid) { - return doStopSharing(getBySid(sid), uid); + return doStopSharing(cm.getBySid(sid), uid); } private StreamDesc doStopSharing(Client c, String uid) { @@ -385,37 +360,24 @@ public class StreamProcessor implements IStreamProcessor { return sd; } - public boolean isSharing(Long roomId) { - if (!kHandler.isConnected()) { - return false; - } - return kHandler.getRoom(roomId).isSharing(); - } - // Recording - public boolean hasRightsToRecord(Client c) { - Room r = c.getRoom(); - return isRecordingsEnabled() && r != null && r.isAllowRecording() && c.hasRight(Right.MODERATOR); - } - public boolean recordingAllowed(Client c) { if (!kHandler.isConnected() || !isRecordingsEnabled()) { return false; } - Room r = c.getRoom(); - return hasRightsToRecord(c) && !isRecording(r.getId()); + return c.hasRightsToRecord() && !isRecording(c.getRoom().getId()); } public void startRecording(Client c) { - if (!kHandler.isConnected() || !hasRightsToRecord(c)) { + if (!kHandler.isConnected() || !c.hasRightsToRecord()) { return; } kHandler.getRoom(c.getRoomId()).startRecording(c); } public void stopRecording(Client c) { - if (!kHandler.isConnected() || !hasRightsToRecord(c)) { + if (!kHandler.isConnected() || !c.hasRightsToRecord()) { return; } kHandler.getRoom(c.getRoomId()).stopRecording(c); @@ -452,7 +414,7 @@ public class StreamProcessor implements IStreamProcessor { AbstractStream s = getByUid(sd.getUid()); if (s != null) { s.release(); - WebSocketHelper.sendRoomOthers(c.getRoomId(), c.getUid(), newStoppedMsg(sd)); + WebSocketHelper.sendRoomOthers(c.getRoomId(), c.getUid(), KurentoUtil.newStoppedMsg(sd)); } } if (c.getRoomId() != null) { @@ -474,10 +436,6 @@ public class StreamProcessor implements IStreamProcessor { .filter(stream -> stream.getRoomId().equals(roomId)); } - Client getBySid(String sid) { - return cm.getBySid(sid); - } - public boolean hasStream(String uid) { return streamByUid.get(uid) != null; } @@ -486,18 +444,6 @@ public class StreamProcessor implements IStreamProcessor { return uid == null ? null : streamByUid.get(uid); } - KurentoHandler getHandler() { - return kHandler; - } - - IClientManager getClientManager() { - return cm; - } - - RecordingDao getRecordingDao() { - return recDao; - } - @Override public void release(AbstractStream stream, boolean releaseStream) { final String uid = stream.getUid(); @@ -528,9 +474,4 @@ public class StreamProcessor implements IStreamProcessor { } } - protected static JSONObject newStoppedMsg(StreamDesc sd) { - return newKurentoMsg() - .put("id", "broadcastStopped") - .put("uid", sd.getUid()); - } } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessorActions.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessorActions.java index 8608485..a797735 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessorActions.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessorActions.java @@ -20,7 +20,6 @@ package org.apache.openmeetings.core.remote; import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE; -import static org.apache.openmeetings.core.remote.KurentoHandler.sendError; import org.apache.openmeetings.core.util.WebSocketHelper; import org.apache.openmeetings.db.entity.basic.Client; @@ -120,8 +119,8 @@ public class StreamProcessorActions { }); } catch (KurentoServerException e) { sender.release(); - WebSocketHelper.sendClient(c, StreamProcessor.newStoppedMsg(sd)); - sendError(c, "Failed to start broadcast: " + e.getMessage()); + WebSocketHelper.sendClient(c, KurentoUtil.newStoppedMsg(sd)); + KurentoUtil.sendError(c, "Failed to start broadcast: " + e.getMessage()); log.error("Failed to start broadcast", e); } } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java index 499d772..d902b7d 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java @@ -89,7 +89,7 @@ class TestStreamProcessor implements IStreamProcessor { } static JSONObject newTestKurentoMsg() { - return KurentoHandler.newKurentoMsg().put(TAG_MODE, MODE_TEST); + return KurentoUtil.newKurentoMsg().put(TAG_MODE, MODE_TEST); } void remove(IWsClient c) { diff --git a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java index 377d48b..3b07635 100644 --- a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java +++ b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRecordingFlowMocked.java @@ -156,7 +156,7 @@ class TestRecordingFlowMocked extends BaseMockedTest { // This should enable the sharing, but for enabling recording it should wait for // the event that broadcast was started assertFalse(streamProcessor.isRecording(ROOM_ID)); - assertTrue(streamProcessor.isSharing(ROOM_ID)); + assertTrue(handler.isSharing(ROOM_ID)); // Get current Stream, there should be only 1 KStream created as result of this assertEquals(1, c.getStreams().size()); @@ -177,7 +177,7 @@ class TestRecordingFlowMocked extends BaseMockedTest { // Assert that stream AND recording is true assertTrue(streamProcessor.isRecording(ROOM_ID)); - assertTrue(streamProcessor.isSharing(ROOM_ID)); + assertTrue(handler.isSharing(ROOM_ID)); //verify startBroadcast has been invoked verify(streamProcessor).startBroadcast(any(), any(), any(), any()); @@ -204,7 +204,7 @@ class TestRecordingFlowMocked extends BaseMockedTest { doReturn(true).when(streamProcessor).startConvertion(any(Recording.class)); // Needed for stopping, needs to stop by sid - doReturn(c).when(streamProcessor).getBySid(c.getSid()); + doReturn(c).when(cm).getBySid(c.getSid()); JSONObject msg = new JSONObject(MSG_BASE.toString()) .put("id", "stopRecord") diff --git a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRoomFlowMocked.java b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRoomFlowMocked.java index d120f9b..ef1acd4 100644 --- a/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRoomFlowMocked.java +++ b/openmeetings-core/src/test/java/org/apache/openmeetings/core/remote/TestRoomFlowMocked.java @@ -152,7 +152,7 @@ class TestRoomFlowMocked extends BaseMockedTest { Client c = getClientFull(); doReturn(c.getRoom()).when(roomDao).get(ROOM_ID); handler.onMessage(c, msg); - assertTrue(streamProcessor.isSharing(ROOM_ID)); + assertTrue(handler.isSharing(ROOM_ID)); handler.onMessage(c, msg); }); } diff --git a/openmeetings-db/src/main/java/org/apache/openmeetings/db/entity/basic/Client.java b/openmeetings-db/src/main/java/org/apache/openmeetings/db/entity/basic/Client.java index 06082fd..382790e 100644 --- a/openmeetings-db/src/main/java/org/apache/openmeetings/db/entity/basic/Client.java +++ b/openmeetings-db/src/main/java/org/apache/openmeetings/db/entity/basic/Client.java @@ -20,6 +20,7 @@ package org.apache.openmeetings.db.entity.basic; import static java.util.UUID.randomUUID; import static org.apache.openmeetings.util.OmFileHelper.SIP_USER_ID; +import static org.apache.openmeetings.util.OpenmeetingsVariables.isRecordingsEnabled; import java.io.Serializable; import java.util.ArrayList; @@ -38,6 +39,7 @@ import org.apache.openmeetings.db.dao.user.UserDao; import org.apache.openmeetings.db.entity.IDataProviderEntity; import org.apache.openmeetings.db.entity.room.Room; import org.apache.openmeetings.db.entity.room.Room.Right; +import org.apache.openmeetings.db.entity.room.Room.RoomElement; import org.apache.openmeetings.db.entity.user.User; import org.apache.wicket.util.string.Strings; @@ -140,6 +142,22 @@ public class Client implements IDataProviderEntity, IWsClient { streams.clear(); } + public boolean isBroadcasting() { + return hasAnyActivity(Activity.AUDIO, Activity.VIDEO); + } + + public boolean hasRightsToRecord() { + Room r = getRoom(); + return isRecordingsEnabled() && r != null && r.isAllowRecording() && hasRight(Right.MODERATOR); + } + + public boolean hasRightsToShare() { + Room r = getRoom(); + return r != null && Room.Type.INTERVIEW != r.getType() + && !r.isHidden(RoomElement.SCREEN_SHARING) + && hasRight(Right.SHARE); + } + public boolean hasRight(Right right) { if (Right.SUPER_MODERATOR == right) { return rights.contains(right); @@ -184,6 +202,25 @@ public class Client implements IDataProviderEntity, IWsClient { return activities.contains(a); } + public boolean activityAllowed(Activity a) { + Room room = getRoom(); + boolean r = false; + switch (a) { + case AUDIO: + r = hasRight(Right.AUDIO); + break; + case VIDEO: + r = !room.isAudioOnly() && hasRight(Right.VIDEO); + break; + case AUDIO_VIDEO: + r = !room.isAudioOnly() && hasRight(Right.AUDIO) && hasRight(Right.VIDEO); + break; + default: + break; + } + return r; + } + public Client toggle(Activity a) { if (hasActivity(a)) { remove(a); diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/common/OmWebSocketPanel.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/common/OmWebSocketPanel.java index 14dedea..fe56dc4 100644 --- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/common/OmWebSocketPanel.java +++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/common/OmWebSocketPanel.java @@ -18,7 +18,7 @@ */ package org.apache.openmeetings.web.common; -import static org.apache.openmeetings.core.remote.KurentoHandler.KURENTO_TYPE; +import static org.apache.openmeetings.core.remote.KurentoUtil.KURENTO_TYPE; import static org.apache.openmeetings.web.app.WebSession.getUserId; import java.io.IOException; diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java index 2798914..2e24569 100644 --- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java +++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java @@ -20,7 +20,6 @@ package org.apache.openmeetings.web.room; import static de.agilecoders.wicket.core.markup.html.bootstrap.dialog.Modal.BUTTON_MARKUP_ID; import static java.time.Duration.ZERO; -import static org.apache.openmeetings.core.remote.KurentoHandler.activityAllowed; import static org.apache.openmeetings.core.util.ChatWebSocketHelper.ID_USER_PREFIX; import static org.apache.openmeetings.db.entity.calendar.Appointment.allowedStart; import static org.apache.openmeetings.util.OmFileHelper.EXTENSION_PDF; @@ -782,10 +781,10 @@ public class RoomPanel extends BasePanel { if (!avInited) { avInited = true; if (Room.Type.CONFERENCE == r.getType()) { - if (!activityAllowed(c, Client.Activity.AUDIO, c.getRoom())) { + if (!c.activityAllowed(Client.Activity.AUDIO)) { c.allow(Room.Right.AUDIO); } - if (!c.getRoom().isAudioOnly() && !activityAllowed(c, Client.Activity.VIDEO, c.getRoom())) { + if (!c.getRoom().isAudioOnly() && !c.activityAllowed(Client.Activity.VIDEO)) { c.allow(Room.Right.VIDEO); } streamProcessor.toggleActivity(c, c.getRoom().isAudioOnly() @@ -816,7 +815,7 @@ public class RoomPanel extends BasePanel { public boolean screenShareAllowed() { Client c = getClient(); - return c.getScreenStream().isPresent() || streamProcessor.screenShareAllowed(c); + return c.getScreenStream().isPresent() || kHandler.screenShareAllowed(c); } public RoomSidebar getSidebar() {
