This is an automated email from the ASF dual-hosted git repository.

sebawagner pushed a commit to branch 
feature/OPENMEETINGS-2315-refactor-kstream-to-streamprocessor
in repository https://gitbox.apache.org/repos/asf/openmeetings.git

commit a51b4fcd16fd7885d4e1754c7cee8af40e1a0c4a
Author: Sebastian Wagner <sebawag...@apache.org>
AuthorDate: Thu Apr 30 09:27:44 2020 +1200

    OPENMEETINGS-2315 Only StreamProcessor to hold reference of KStream
---
 .../org/apache/openmeetings/core/remote/KRoom.java | 75 +++++++++-------------
 .../apache/openmeetings/core/remote/KStream.java   |  2 +-
 .../openmeetings/core/remote/KurentoHandler.java   |  6 +-
 .../openmeetings/core/remote/StreamProcessor.java  | 22 +++++--
 .../core/remote/TestStreamProcessor.java           |  1 +
 .../web/admin/connection/ConnectionsPanel.java     |  8 ---
 6 files changed, 52 insertions(+), 62 deletions(-)

diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
index 47abfdd..fc414dc 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
@@ -27,9 +27,7 @@ import static 
org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg;
 
 import java.util.Collection;
 import java.util.Date;
-import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.openmeetings.core.util.WebSocketHelper;
@@ -53,10 +51,16 @@ import org.slf4j.LoggerFactory;
 
 import com.github.openjson.JSONObject;
 
+/**
+ * Bean object dynamically created representing a conference room on the 
MediaServer
+ *
+ */
 public class KRoom {
        private static final Logger log = LoggerFactory.getLogger(KRoom.class);
-
-       private final Map<String, KStream> streams = new ConcurrentHashMap<>();
+       /**
+        * Not injected by annotation but by constructor.
+        */
+       private final StreamProcessor streamProcessor;
        private final MediaPipeline pipeline;
        private final Long roomId;
        private final Room.Type type;
@@ -67,7 +71,8 @@ public class KRoom {
        private JSONObject recordingUser = new JSONObject();
        private JSONObject sharingUser = new JSONObject();
 
-       public KRoom(Room r, MediaPipeline pipeline, RecordingChunkDao 
chunkDao) {
+       public KRoom(StreamProcessor streamProcessor, Room r, MediaPipeline 
pipeline, RecordingChunkDao chunkDao) {
+               this.streamProcessor = streamProcessor;
                this.roomId = r.getId();
                this.type = r.getType();
                this.pipeline = pipeline;
@@ -98,16 +103,16 @@ public class KRoom {
        public KStream join(final StreamDesc sd) {
                log.info("ROOM {}: join client {}, stream: {}", roomId, 
sd.getClient(), sd.getUid());
                final KStream stream = new KStream(sd, this);
-               streams.put(stream.getUid(), stream);
+               streamProcessor.addStream(stream);
                return stream;
        }
 
        public Collection<KStream> getParticipants() {
-               return streams.values();
+               return streamProcessor.getStreamsByRoom(this.getRoomId());
        }
 
        public void onStopBroadcast(KStream stream, final StreamProcessor 
processor) {
-               streams.remove(stream.getUid());
+               streamProcessor.release(stream);
                stream.release(processor);
                WebSocketHelper.sendAll(newKurentoMsg()
                                .put("id", "broadcastStopped")
@@ -118,21 +123,6 @@ public class KRoom {
                //FIXME TODO permission can be removed, some listener might be 
required
        }
 
-       public void leave(final StreamProcessor processor, final Client c) {
-               for (Map.Entry<String, KStream> e : streams.entrySet()) {
-                       e.getValue().remove(c);
-               }
-               for (StreamDesc sd : c.getStreams()) {
-                       if (StreamType.SCREEN == sd.getType()) {
-
-                       }
-                       KStream stream = streams.remove(sd.getUid());
-                       if (stream != null) {
-                               stream.release(processor);
-                       }
-               }
-       }
-
        public boolean isRecording() {
                return recordingStarted.get();
        }
@@ -141,7 +131,7 @@ public class KRoom {
                return new JSONObject(recordingUser.toString());
        }
 
-       public void startRecording(StreamProcessor processor, Client c) {
+       public void startRecording(Client c) {
                if (recordingStarted.compareAndSet(false, true)) {
                        log.debug("##REC:: recording in room {} is starting 
::", roomId);
                        Room r = c.getRoom();
@@ -173,16 +163,16 @@ public class KRoom {
                        Optional<StreamDesc> osd = c.getScreenStream();
                        if (osd.isPresent()) {
                                osd.get().addActivity(Activity.RECORD);
-                               processor.getClientManager().update(c);
+                               streamProcessor.getClientManager().update(c);
                                rec.setWidth(osd.get().getWidth());
                                rec.setHeight(osd.get().getHeight());
                        }
-                       rec = processor.getRecordingDao().update(rec);
+                       rec = streamProcessor.getRecordingDao().update(rec);
                        // Receive recordingId
                        recordingId = rec.getId();
-                       for (final KStream stream : streams.values()) {
-                               stream.startRecord(processor);
-                       }
+                       
streamProcessor.getStreamsByRoom(this.getRoomId()).forEach(
+                                       stream -> 
stream.startRecord(streamProcessor)
+                       );
 
                        // Send notification to all users that the recording 
has been started
                        WebSocketHelper.sendRoom(new RoomMessage(roomId, u, 
RoomMessage.Type.RECORDING_TOGGLED));
@@ -190,19 +180,19 @@ public class KRoom {
                }
        }
 
-       public void stopRecording(final StreamProcessor processor, Client c) {
+       public void stopRecording(Client c) {
                if (recordingStarted.compareAndSet(true, false)) {
                        log.debug("##REC:: recording in room {} is stopping {} 
::", roomId, recordingId);
-                       for (final KStream stream : streams.values()) {
-                               stream.stopRecord();
-                       }
-                       Recording rec = 
processor.getRecordingDao().get(recordingId);
+                       
streamProcessor.getStreamsByRoom(this.getRoomId()).forEach(
+                                       stream -> stream.stopRecord()
+                       );
+                       Recording rec = 
streamProcessor.getRecordingDao().get(recordingId);
                        rec.setRecordEnd(new Date());
-                       rec = processor.getRecordingDao().update(rec);
+                       rec = streamProcessor.getRecordingDao().update(rec);
                        recordingUser = new JSONObject();
                        recordingId = null;
 
-                       processor.startConvertion(rec);
+                       streamProcessor.startConvertion(rec);
                        User u;
                        if (c == null) {
                                u = new User();
@@ -211,8 +201,8 @@ public class KRoom {
                                Optional<StreamDesc> osd = c.getScreenStream();
                                if (osd.isPresent()) {
                                        
osd.get().removeActivity(Activity.RECORD);
-                                       processor.getClientManager().update(c);
-                                       
processor.getHandler().sendShareUpdated(osd.get());
+                                       
streamProcessor.getClientManager().update(c);
+                                       
streamProcessor.getHandler().sendShareUpdated(osd.get());
                                }
                        }
                        // Send notification to all users that the recording 
has been started
@@ -270,11 +260,10 @@ public class KRoom {
                }
        }
 
-       public void close(final StreamProcessor processor) {
-               for (final KStream stream : streams.values()) {
-                       stream.release(processor);
-               }
-               streams.clear();
+       public void close() {
+               streamProcessor.getStreamsByRoom(this.getRoomId()).forEach(
+                               stream -> stream.release(streamProcessor)
+               );
                pipeline.release(new Continuation<Void>() {
                        @Override
                        public void onSuccess(Void result) throws Exception {
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
index 397d25f..e7cb42b 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
@@ -144,7 +144,7 @@ public class KStream extends AbstractStream {
                        }
                });
                outgoingMedia.addMediaFlowInStateChangeListener(evt -> 
log.warn("Media FlowIn :: {}", evt));
-               processor.addStream(this);
+               // processor.addStream(this);
                addListener(processor, sd.getSid(), sd.getUid(), sdpOffer);
                if (room.isRecording()) {
                        startRecord(processor);
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
index ade553e..e7d60405 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
@@ -142,7 +142,7 @@ public class KurentoHandler {
                        kuid = randomUUID().toString(); // will be changed to 
prevent double events
                        client.destroy();
                        for (Entry<Long, KRoom> e : rooms.entrySet()) {
-                               e.getValue().close(streamProcessor);
+                               e.getValue().close();
                        }
                        testProcessor.destroy();
                        streamProcessor.destroy();
@@ -238,7 +238,7 @@ public class KurentoHandler {
                        pipe.addTag(t, TAG_KUID, kuid);
                        pipe.addTag(t, TAG_ROOM, String.valueOf(roomId));
                        t.commit();
-                       room = new KRoom(r, pipe, chunkDao);
+                       room = new KRoom(streamProcessor, r, pipe, chunkDao);
                        rooms.put(roomId, room);
                }
                log.debug("Room {} found!", roomId);
@@ -426,7 +426,7 @@ public class KurentoHandler {
                                                        return;
                                                } else if (r != null) {
                                                        
rooms.remove(r.getRoomId());
-                                                       
r.close(streamProcessor);
+                                                       r.close();
                                                }
                                        }
                                        log.warn("Invalid MediaPipeline {} 
detected, will be dropped, tags: {}", pipe.getId(), tags);
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
index 32cda74..779928a 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
@@ -25,6 +25,7 @@ import static 
org.apache.openmeetings.core.remote.KurentoHandler.activityAllowed
 import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg;
 import static org.apache.openmeetings.core.remote.KurentoHandler.sendError;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
@@ -65,6 +66,9 @@ import com.github.openjson.JSONObject;
 @Component
 public class StreamProcessor implements IStreamProcessor {
        private static final Logger log = 
LoggerFactory.getLogger(StreamProcessor.class);
+       /**
+        * Holds a reference to the current streams available on the server 
instance
+        */
        private final Map<String, KStream> streamByUid = new 
ConcurrentHashMap<>();
 
        @Autowired
@@ -305,7 +309,7 @@ public class StreamProcessor implements IStreamProcessor {
                                room.stopSharing();
                                if (Room.Type.INTERVIEW != room.getType() && 
room.isRecording()) {
                                        log.info("No more screen streams in the 
non-interview room, stopping recording");
-                                       room.stopRecording(this, null);
+                                       room.stopRecording(null);
                                }
                        }
                }
@@ -315,7 +319,7 @@ public class StreamProcessor implements IStreamProcessor {
                                        .collect(Collectors.toList());
                        if (streams.isEmpty()) {
                                log.info("No more streams in the room, stopping 
recording");
-                               room.stopRecording(this, null);
+                               room.stopRecording(null);
                        }
                }
        }
@@ -447,14 +451,14 @@ public class StreamProcessor implements IStreamProcessor {
                if (!kHandler.isConnected() || !hasRightsToRecord(c)) {
                        return;
                }
-               kHandler.getRoom(c.getRoomId()).startRecording(this, c);
+               kHandler.getRoom(c.getRoomId()).startRecording(c);
        }
 
        public void stopRecording(Client c) {
                if (!kHandler.isConnected() || !hasRightsToRecord(c)) {
                        return;
                }
-               kHandler.getRoom(c.getRoomId()).stopRecording(this, c);
+               kHandler.getRoom(c.getRoomId()).stopRecording(c);
 
                // In case this user wasn't shareing his screen we also need to 
close that one
                c.getScreenStream().ifPresent(sd -> {
@@ -494,13 +498,11 @@ public class StreamProcessor implements IStreamProcessor {
                        }
                }
                if (c.getRoomId() != null) {
-                       KRoom room = kHandler.getRoom(c.getRoomId());
-                       room.leave(this, c);
                        checkStreams(c.getRoomId());
                }
        }
 
-       void addStream(KStream stream) {
+       public void addStream(KStream stream) {
                streamByUid.put(stream.getUid(), stream);
        }
 
@@ -508,6 +510,12 @@ public class StreamProcessor implements IStreamProcessor {
                return streamByUid.values();
        }
 
+       public Collection<KStream> getStreamsByRoom(Long roomId) {
+               return streamByUid.values().stream()
+                               .filter(stream -> stream.getRoom() != null && 
stream.getRoom().getRoomId().equals(roomId))
+                               
.collect(Collectors.toCollection(ArrayList::new));
+       }
+
        Client getBySid(String sid) {
                return cm.getBySid(sid);
        }
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java
index e3a171d..764e458 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java
@@ -126,4 +126,5 @@ public class TestStreamProcessor implements 
IStreamProcessor {
                }
                streamByUid.clear();
        }
+
 }
diff --git 
a/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/ConnectionsPanel.java
 
b/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/ConnectionsPanel.java
index 84cc14c..270f809 100644
--- 
a/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/ConnectionsPanel.java
+++ 
b/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/ConnectionsPanel.java
@@ -90,14 +90,6 @@ public class ConnectionsPanel extends AdminBasePanel {
                                l.addAll(streams);
                                log.info("Retrieve all Streams, StreamProcessor 
has {} of streams", streams.size());
 
-                               List<KStreamDto> missing = kHandler.getRooms()
-                                               .stream()
-                                               .flatMap(room -> 
room.getParticipants().stream())
-                                               .filter(stream -> 
!streamProcessor.hasStream(stream.getUid()))
-                                               .map(kStream -> new 
KStreamDto("KRoom", kStream))
-                                               .collect(Collectors.toList());
-                               l.addAll(missing);
-                               log.warn("Following streams were in KRoom but 
not in StreamProcessor: {}", missing);
                                return l;
                        }
 

Reply via email to