Repository: zeppelin Updated Branches: refs/heads/master 89392a3f2 -> dfbea2eb9
[Zeppelin-1561] Improve sync for multiuser environment ### What is this PR for? apply multi-tenancy for storage sync mechanism ### What type of PR is it? Bug Fix | Improvement ### Todos * [x] - broadcast on sync * [x] - set permissions for pulled notes * [x] - add test ### What is the Jira issue? [ZEPPELIN-1561](https://issues.apache.org/jira/browse/ZEPPELIN-1561) ### How should this be tested? Outline the steps to test the PR here. ### Screenshots (if appropriate) green CI ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Author: Khalid Huseynov <[email protected]> Closes #1537 from khalidhuseynov/improve/sync-multiuser and squashes the following commits: b3e6ed3 [Khalid Huseynov] add userAndRoles 0f2ade7 [Khalid Huseynov] reformat style bd1a44a [Khalid Huseynov] address comment + test 05afa2a [Khalid Huseynov] remove syncOnStart b104249 [Khalid Huseynov] add isAnonymous 1a54cc0 [Khalid Huseynov] set perms for pulling notes - make them private 585a675 [Khalid Huseynov] reload, sync and broadcast on login cd1c3fa [Khalid Huseynov] don't sync on start Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/dfbea2eb Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/dfbea2eb Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/dfbea2eb Branch: refs/heads/master Commit: dfbea2eb988e6b9ebf017d6a35f0ba590ce2873e Parents: 89392a3 Author: Khalid Huseynov <[email protected]> Authored: Sun Oct 23 21:52:23 2016 +0900 Committer: Anthony Corbacho <[email protected]> Committed: Mon Oct 24 18:07:21 2016 +0900 ---------------------------------------------------------------------- .../zeppelin/user/AuthenticationInfo.java | 20 ++++++ .../apache/zeppelin/realm/ZeppelinHubRealm.java | 10 +++ .../notebook/repo/NotebookRepoSync.java | 47 ++++++++++---- .../notebook/repo/NotebookRepoSyncTest.java | 68 ++++++++++++++++++++ 4 files changed, 134 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/dfbea2eb/zeppelin-interpreter/src/main/java/org/apache/zeppelin/user/AuthenticationInfo.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/user/AuthenticationInfo.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/user/AuthenticationInfo.java index de41692..11d1562 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/user/AuthenticationInfo.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/user/AuthenticationInfo.java @@ -18,13 +18,20 @@ package org.apache.zeppelin.user; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /*** * */ public class AuthenticationInfo { + private static final Logger LOG = LoggerFactory.getLogger(AuthenticationInfo.class); String user; String ticket; UserCredentials userCredentials; + public static final AuthenticationInfo ANONYMOUS = new AuthenticationInfo("anonymous", + "anonymous"); public AuthenticationInfo() {} @@ -66,4 +73,17 @@ public class AuthenticationInfo { this.userCredentials = userCredentials; } + public static boolean isAnonymous(AuthenticationInfo subject) { + if (subject == null) { + LOG.warn("Subject is null, assuming anonymous. " + + "Not recommended to use subject as null except in tests"); + return true; + } + return subject.isAnonymous(); + } + + public boolean isAnonymous() { + return ANONYMOUS.equals(this) || "anonymous".equalsIgnoreCase(this.getUser()) + || StringUtils.isEmpty(this.getUser()); + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/dfbea2eb/zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java b/zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java index cbe490d..67ed544 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/realm/ZeppelinHubRealm.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; +import java.util.HashSet; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.httpclient.HttpClient; @@ -36,6 +37,7 @@ import org.apache.shiro.authc.UsernamePasswordToken; import org.apache.shiro.authz.AuthorizationInfo; import org.apache.shiro.realm.AuthorizingRealm; import org.apache.shiro.subject.PrincipalCollection; +import org.apache.zeppelin.server.ZeppelinServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,6 +137,7 @@ public class ZeppelinHubRealm extends AuthorizingRealm { } responseBody = put.getResponseBodyAsString(); put.releaseConnection(); + } catch (IOException e) { LOG.error("Cannot login user", e); throw new AuthenticationException(e.getMessage()); @@ -147,6 +150,13 @@ public class ZeppelinHubRealm extends AuthorizingRealm { LOG.error("Cannot deserialize ZeppelinHub response to User instance", e); throw new AuthenticationException("Cannot login to ZeppelinHub"); } + + /* TODO(khalid): add proper roles and add listener */ + HashSet<String> userAndRoles = new HashSet<String>(); + userAndRoles.add(account.login); + ZeppelinServer.notebookWsServer.broadcastReloadedNoteList( + new org.apache.zeppelin.user.AuthenticationInfo(account.login), userAndRoles); + return account; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/dfbea2eb/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java index fdf7e78..635d6f2 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java @@ -24,8 +24,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; @@ -92,14 +94,6 @@ public class NotebookRepoSync implements NotebookRepo { LOG.info("No storages could be initialized, using default {} storage", defaultStorage); initializeDefaultStorage(conf); } - if (getRepoCount() > 1) { - try { - AuthenticationInfo subject = new AuthenticationInfo("anonymous"); - sync(0, 1, subject); - } catch (IOException e) { - LOG.warn("Failed to sync with secondary storage on start {}", e); - } - } } @SuppressWarnings("static-access") @@ -172,6 +166,10 @@ public class NotebookRepoSync implements NotebookRepo { /* TODO(khalid): handle case when removing from secondary storage fails */ } + void remove(int repoIndex, String noteId, AuthenticationInfo subject) throws IOException { + getRepo(repoIndex).remove(noteId, subject); + } + /** * Copies new/updated notes from source to destination storage * @@ -197,7 +195,7 @@ public class NotebookRepoSync implements NotebookRepo { for (String id : pushNoteIDs) { LOG.info("ID : " + id); } - pushNotes(subject, pushNoteIDs, srcRepo, dstRepo); + pushNotes(subject, pushNoteIDs, srcRepo, dstRepo, false); } else { LOG.info("Nothing to push"); } @@ -207,7 +205,7 @@ public class NotebookRepoSync implements NotebookRepo { for (String id : pullNoteIDs) { LOG.info("ID : " + id); } - pushNotes(subject, pullNoteIDs, dstRepo, srcRepo); + pushNotes(subject, pullNoteIDs, dstRepo, srcRepo, true); } else { LOG.info("Nothing to pull"); } @@ -230,16 +228,43 @@ public class NotebookRepoSync implements NotebookRepo { } private void pushNotes(AuthenticationInfo subject, List<String> ids, NotebookRepo localRepo, - NotebookRepo remoteRepo) { + NotebookRepo remoteRepo, boolean setPermissions) { for (String id : ids) { try { remoteRepo.save(localRepo.get(id, subject), subject); + if (setPermissions && emptyNoteAcl(id)) { + makePrivate(id, subject); + } } catch (IOException e) { LOG.error("Failed to push note to storage, moving onto next one", e); } } } + private boolean emptyNoteAcl(String noteId) { + NotebookAuthorization notebookAuthorization = NotebookAuthorization.getInstance(); + return notebookAuthorization.getOwners(noteId).isEmpty() + && notebookAuthorization.getReaders(noteId).isEmpty() + && notebookAuthorization.getWriters(noteId).isEmpty(); + } + + private void makePrivate(String noteId, AuthenticationInfo subject) { + if (AuthenticationInfo.isAnonymous(subject)) { + LOG.info("User is anonymous, permissions are not set for pulled notes"); + return; + } + NotebookAuthorization notebookAuthorization = NotebookAuthorization.getInstance(); + Set<String> users = notebookAuthorization.getOwners(noteId); + users.add(subject.getUser()); + notebookAuthorization.setOwners(noteId, users); + users = notebookAuthorization.getReaders(noteId); + users.add(subject.getUser()); + notebookAuthorization.setReaders(noteId, users); + users = notebookAuthorization.getWriters(noteId); + users.add(subject.getUser()); + notebookAuthorization.setWriters(noteId, users); + } + private void deleteNotes(AuthenticationInfo subject, List<String> ids, NotebookRepo repo) throws IOException { for (String id : ids) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/dfbea2eb/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java index 43ed586..ebd8ad8 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java @@ -24,7 +24,9 @@ import static org.mockito.Mockito.mock; import java.io.File; import java.io.IOException; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.commons.io.FileUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; @@ -314,6 +316,72 @@ public class NotebookRepoSyncTest implements JobListenerFactory { notebookRepoSync.remove(note.getId(), anonymous); } + @Test + public void testSyncWithAcl() throws IOException { + /* scenario 1 - note exists with acl on main storage */ + AuthenticationInfo user1 = new AuthenticationInfo("user1"); + Note note = notebookSync.createNote(user1); + assertEquals(0, note.getParagraphs().size()); + + // saved on both storages + assertEquals(1, notebookRepoSync.list(0, null).size()); + assertEquals(1, notebookRepoSync.list(1, null).size()); + + /* check that user1 is the only owner */ + NotebookAuthorization authInfo = NotebookAuthorization.getInstance(); + Set<String> entity = new HashSet<String>(); + entity.add(user1.getUser()); + assertEquals(true, authInfo.isOwner(note.getId(), entity)); + assertEquals(1, authInfo.getOwners(note.getId()).size()); + assertEquals(0, authInfo.getReaders(note.getId()).size()); + assertEquals(0, authInfo.getWriters(note.getId()).size()); + + /* update note and save on secondary storage */ + Paragraph p1 = note.addParagraph(); + p1.setText("hello world"); + assertEquals(1, note.getParagraphs().size()); + notebookRepoSync.save(1, note, null); + + /* check paragraph isn't saved into first storage */ + assertEquals(0, notebookRepoSync.get(0, + notebookRepoSync.list(0, null).get(0).getId(), null).getParagraphs().size()); + /* check paragraph is saved into second storage */ + assertEquals(1, notebookRepoSync.get(1, + notebookRepoSync.list(1, null).get(0).getId(), null).getParagraphs().size()); + + /* now sync by user1 */ + notebookRepoSync.sync(user1); + + /* check that note updated and acl are same on main storage*/ + assertEquals(1, notebookRepoSync.get(0, + notebookRepoSync.list(0, null).get(0).getId(), null).getParagraphs().size()); + assertEquals(true, authInfo.isOwner(note.getId(), entity)); + assertEquals(1, authInfo.getOwners(note.getId()).size()); + assertEquals(0, authInfo.getReaders(note.getId()).size()); + assertEquals(0, authInfo.getWriters(note.getId()).size()); + + /* scenario 2 - note doesn't exist on main storage */ + /* remove from main storage */ + notebookRepoSync.remove(0, note.getId(), user1); + assertEquals(0, notebookRepoSync.list(0, null).size()); + assertEquals(1, notebookRepoSync.list(1, null).size()); + authInfo.removeNote(note.getId()); + assertEquals(0, authInfo.getOwners(note.getId()).size()); + assertEquals(0, authInfo.getReaders(note.getId()).size()); + assertEquals(0, authInfo.getWriters(note.getId()).size()); + + /* now sync - should bring note from secondary storage with added acl */ + notebookRepoSync.sync(user1); + assertEquals(1, notebookRepoSync.list(0, null).size()); + assertEquals(1, notebookRepoSync.list(1, null).size()); + assertEquals(1, authInfo.getOwners(note.getId()).size()); + assertEquals(1, authInfo.getReaders(note.getId()).size()); + assertEquals(1, authInfo.getWriters(note.getId()).size()); + assertEquals(true, authInfo.isOwner(note.getId(), entity)); + assertEquals(true, authInfo.isReader(note.getId(), entity)); + assertEquals(true, authInfo.isWriter(note.getId(), entity)); + } + static void delete(File file){ if(file.isFile()) file.delete(); else if(file.isDirectory()){
