Repository: incubator-impala
Updated Branches:
  refs/heads/master dad489669 -> 6604083f5


IMPALA-5355: Fix the order of Sentry roles and privileges

After a single Impalad is restarted, it is possible that order in which
it receives roles and privileges from the statestore is incorrect. The
correct order is for the role to appear first in the update, before the
privilege that references it.

If a user updates a role, its catalog version number can become larger
than the catalog numbers of the privileges that reference it. This
causes the role to come after the privilege in the initial metastore
update.

The issue is fixed by doing two passes over the catalog objects in the
Impalad. The first pass updates the top level objects. The second pass
updates the dependent objects

Testing:
- Added a test that reproduced the problem.

Change-Id: I7072e95b74952ce5a51ea1b6e2ae3e80fb0940e0
Reviewed-on: http://gerrit.cloudera.org:8080/7004
Reviewed-by: Taras Bobrovytsky <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6604083f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6604083f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6604083f

Branch: refs/heads/master
Commit: 6604083f518d332f3f13601e61a9860c23cab4ca
Parents: dad4896
Author: Taras Bobrovytsky <[email protected]>
Authored: Fri May 26 17:14:05 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Wed Jun 7 00:30:46 2017 +0000

----------------------------------------------------------------------
 .../apache/impala/catalog/ImpaladCatalog.java   | 48 +++++++++++++++-----
 tests/authorization/test_grant_revoke.py        | 43 ++++++++++++++++++
 tests/common/impala_test_suite.py               | 12 ++---
 3 files changed, 86 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6604083f/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 45db6a2..e6826b7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -101,21 +101,33 @@ public class ImpaladCatalog extends Catalog {
   }
 
   /**
+   * Returns true if the given object does not depend on any other object 
already
+   * existing in the catalog in order to be added.
+   */
+  private boolean isTopLevelCatalogObject(TCatalogObject catalogObject) {
+    return catalogObject.getType() == TCatalogObjectType.DATABASE||
+        catalogObject.getType() == TCatalogObjectType.DATA_SOURCE ||
+        catalogObject.getType() == TCatalogObjectType.HDFS_CACHE_POOL ||
+        catalogObject.getType() == TCatalogObjectType.ROLE;
+  }
+
+  /**
    * Updates the internal Catalog based on the given TCatalogUpdateReq.
    * This method:
-   * 1) Updates all databases in the Catalog
-   * 2) Updates all tables, views, and functions in the Catalog
-   * 3) Removes all dropped tables, views, and functions
-   * 4) Removes all dropped databases
+   * 1) Updates all top level objects (such as databases and roles).
+   * 2) Updates all objects that depend on top level objects (such as 
functions, tables,
+   *    privileges).
+   * 3) Removes all dropped catalog objects.
    *
    * This method is called once per statestore heartbeat and is guaranteed the 
same
    * object will not be in both the "updated" list and the "removed" list (it 
is
    * a detail handled by the statestore).
-   * Catalog updates are ordered by the object type with the dependent objects 
coming
-   * first. That is, database "foo" will always come before table "foo.bar".
-   * Synchronized because updateCatalog() can be called by during a statestore 
update or
-   * during a direct-DDL operation and catalogServiceId_ and 
lastSyncedCatalogVersion_
-   * must be protected.
+   * Catalog objects are ordered by version, which is not necessarily the same 
as ordering
+   * by dependency. This is handled by doing two passes and first updating the 
top level
+   * objects, followed by updating the dependent objects. This method is 
synchronized
+   * because updateCatalog() can be called by during a statestore update or 
during a
+   * direct-DDL operation and catalogServiceId_ and lastSyncedCatalogVersion_ 
must be
+   * protected.
    */
   public synchronized TUpdateCatalogCacheResponse updateCatalog(
     TUpdateCatalogCacheRequest req) throws CatalogException {
@@ -130,12 +142,26 @@ public class ImpaladCatalog extends Catalog {
       }
     }
 
-    // First process all updates
+    // Process updates to top level objects first because they don't depend on 
any other
+    // objects already existing in the catalog.
     long newCatalogVersion = lastSyncedCatalogVersion_;
     for (TCatalogObject catalogObject: req.getUpdated_objects()) {
       if (catalogObject.getType() == TCatalogObjectType.CATALOG) {
         newCatalogVersion = catalogObject.getCatalog_version();
-      } else {
+      } else if (isTopLevelCatalogObject(catalogObject)) {
+        try {
+          addCatalogObject(catalogObject);
+        } catch (Exception e) {
+          LOG.error("Error adding catalog object: " + e.getMessage(), e);
+        }
+      }
+    }
+
+    // Process updates to dependent objects next. Since the top level objects 
were already
+    // processed, we are guaranteed that the top level objects that the 
dependent objects
+    // depend on exist in the catalog.
+    for (TCatalogObject catalogObject: req.getUpdated_objects()) {
+      if (!isTopLevelCatalogObject(catalogObject)) {
         try {
           addCatalogObject(catalogObject);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6604083f/tests/authorization/test_grant_revoke.py
----------------------------------------------------------------------
diff --git a/tests/authorization/test_grant_revoke.py 
b/tests/authorization/test_grant_revoke.py
index 1a191ba..f6b8420 100644
--- a/tests/authorization/test_grant_revoke.py
+++ b/tests/authorization/test_grant_revoke.py
@@ -21,10 +21,12 @@ import grp
 import pytest
 from getpass import getuser
 from os import getenv
+from time import sleep
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import create_uncompressed_text_dimension
+from tests.verifiers.metric_verifier import MetricVerifier
 
 SENTRY_CONFIG_FILE = getenv('IMPALA_HOME') + 
'/fe/src/test/resources/sentry-site.xml'
 
@@ -71,9 +73,50 @@ class TestGrantRevoke(CustomClusterTestSuite, 
ImpalaTestSuite):
     finally:
       self.client.execute("drop role grant_revoke_test_admin")
 
+  @classmethod
+  def restart_first_impalad(cls):
+    impalad = cls.cluster.impalads[0]
+    impalad.restart()
+    cls.client = impalad.service.create_beeswax_client()
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args="--server_name=server1",
       catalogd_args="--sentry_config=" + SENTRY_CONFIG_FILE)
   def test_grant_revoke(self, vector):
     self.run_test_case('QueryTest/grant_revoke', vector, use_db="default")
+
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--server_name=server1",
+      catalogd_args="--sentry_config=" + SENTRY_CONFIG_FILE,
+      statestored_args=("--statestore_heartbeat_frequency_ms=300 "
+                        "--statestore_update_frequency_ms=300"))
+  def test_role_update(self, vector):
+    """IMPALA-5355: The initial update from the statestore has the privileges 
and roles in
+    reverse order if a role was modified, but not the associated privilege. 
Verify that
+    Impala is able to handle this.
+    """
+    self.client.execute("create role test_role")
+    self.client.execute("grant all on server to test_role")
+    # Wait a few seconds to make sure the update propagates to the statestore.
+    sleep(3)
+    # Update the role, increasing its catalog verion.
+    self.client.execute("grant role test_role to group {0}".format(
+        grp.getgrnam(getuser()).gr_name))
+    result = self.client.execute("show tables in functional")
+    assert 'alltypes' in result.data
+    privileges_before = self.client.execute("show grant role test_role")
+    # Wait a few seconds before restarting Impalad to make sure that the 
Catalog gets
+    # updated.
+    sleep(3)
+    self.restart_first_impalad()
+    verifier = MetricVerifier(self.cluster.impalads[0].service)
+    verifier.wait_for_metric("catalog.ready", True)
+    # Verify that we still have the right privileges after the first impalad 
was
+    # restarted.
+    result = self.client.execute("show tables in functional");
+    assert 'alltypes' in result.data
+    privileges_after = self.client.execute("show grant role test_role")
+    assert privileges_before.data == privileges_after.data

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6604083f/tests/common/impala_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index 3af1ed0..a297e6c 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -180,16 +180,16 @@ class ImpalaTestSuite(BaseTestSuite):
     return hdfs_client
 
   @classmethod
-  def all_db_names(self):
-    results = self.client.execute("show databases").data
+  def all_db_names(cls):
+    results = cls.client.execute("show databases").data
     # Extract first column - database name
     return [row.split("\t")[0] for row in results]
 
   @classmethod
-  def cleanup_db(self, db_name, sync_ddl=1):
-    self.client.execute("use default")
-    self.client.set_configuration({'sync_ddl': sync_ddl})
-    self.client.execute("drop database if exists `" + db_name + "` cascade")
+  def cleanup_db(cls, db_name, sync_ddl=1):
+    cls.client.execute("use default")
+    cls.client.set_configuration({'sync_ddl': sync_ddl})
+    cls.client.execute("drop database if exists `" + db_name + "` cascade")
 
   def __restore_query_options(self, query_options_changed, impalad_client):
     """

Reply via email to