[
https://issues.apache.org/jira/browse/HIVE-23351?focusedWorklogId=432643&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-432643
]
ASF GitHub Bot logged work on HIVE-23351:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 11/May/20 06:06
Start Date: 11/May/20 06:06
Worklog Time Spent: 10m
Work Description: pkumarsinha commented on a change in pull request #1004:
URL: https://github.com/apache/hive/pull/1004#discussion_r422688212
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -144,6 +145,11 @@ public int execute() {
if (shouldDump(previousValidHiveDumpPath)) {
Path currentDumpPath = getCurrentDumpPath(dumpRoot, isBootstrap);
Path hiveDumpRoot = new Path(currentDumpPath,
ReplUtils.REPL_HIVE_BASE_DIR);
+ work.setCurrentDumpPath(currentDumpPath);
+ if (shouldDumpAuthorizationMetadata()) {
+ LOG.info("Initiate authorization metadata dump provided by {} ",
RANGER_AUTHORIZER);
+ initiateAuthorizationDumpTask(currentDumpPath);
Review comment:
No need to pass this, use work.getCurrentDumpPath().
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -144,6 +145,11 @@ public int execute() {
if (shouldDump(previousValidHiveDumpPath)) {
Path currentDumpPath = getCurrentDumpPath(dumpRoot, isBootstrap);
Path hiveDumpRoot = new Path(currentDumpPath,
ReplUtils.REPL_HIVE_BASE_DIR);
+ work.setCurrentDumpPath(currentDumpPath);
+ if (shouldDumpAuthorizationMetadata()) {
+ LOG.info("Initiate authorization metadata dump provided by {} ",
RANGER_AUTHORIZER);
Review comment:
This log and the log message inside the method
"initiateAuthorizationDumpTask" looks repetitive even though they print two
info. It would be better if you remove this one and club the
"RANGER_AUTHORIZER" info in the the log message inside
"initiateAuthorizationDumpTask".
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -97,214 +100,257 @@ public StageType getType() {
@Override
public int execute() {
- Task<?> rootTask = work.getRootTask();
- if (rootTask != null) {
- rootTask.setChildTasks(null);
+ try {
+ Task<?> rootTask = work.getRootTask();
+ if (rootTask != null) {
+ rootTask.setChildTasks(null);
+ }
+ work.setRootTask(this);
+ this.parentTasks = null;
+ if (shouldLoadAuthorizationMetadata()) {
+ LOG.info("Loading authorization data provided by service {} ",
RANGER_AUTHORIZER);
+ initiateAuthorizationLoadTask(work.dumpDirectory);
Review comment:
Don't need to pass this work.dumpDirectory, it is available there as well
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -97,214 +100,257 @@ public StageType getType() {
@Override
public int execute() {
- Task<?> rootTask = work.getRootTask();
- if (rootTask != null) {
- rootTask.setChildTasks(null);
+ try {
+ Task<?> rootTask = work.getRootTask();
+ if (rootTask != null) {
+ rootTask.setChildTasks(null);
+ }
+ work.setRootTask(this);
+ this.parentTasks = null;
+ if (shouldLoadAuthorizationMetadata()) {
+ LOG.info("Loading authorization data provided by service {} ",
RANGER_AUTHORIZER);
Review comment:
Remove log and club it with the log info for RANGER_AUTHORIZER in log
message inside the initiateAuthorizationLoadTask
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -340,30 +387,30 @@ private void dropTablesExcludedInReplScope(ReplScope
replScope) throws HiveExcep
// List all the tables that are excluded in the current repl scope.
Iterable<String> tableNames = Collections2.filter(db.getAllTables(dbName),
tableName -> {
- assert(tableName != null);
+ assert (tableName != null);
return !tableName.toLowerCase().startsWith(
- SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase())
- && !replScope.tableIncludedInReplScope(tableName);
+ SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase())
+ && !replScope.tableIncludedInReplScope(tableName);
});
for (String table : tableNames) {
db.dropTable(dbName + "." + table, true);
}
LOG.info("Tables in the Database: {} that are excluded in the replication
scope are dropped.",
- dbName);
+ dbName);
}
private void createReplLoadCompleteAckTask() {
if ((work.isIncrementalLoad() &&
!work.incrementalLoadTasksBuilder().hasMoreWork() &&
!work.hasBootstrapLoadTasks())
- || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+ || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
//All repl load tasks are executed and status is 0, create the task to
add the acknowledgement
AckWork replLoadAckWork = new AckWork(
- new Path(work.dumpDirectory, LOAD_ACKNOWLEDGEMENT.toString()));
+ new Path(work.dumpDirectory, LOAD_ACKNOWLEDGEMENT.toString()));
Task<AckWork> loadAckWorkTask = TaskFactory.get(replLoadAckWork, conf);
- if (this.childTasks.isEmpty()) {
+ if (childTasks.isEmpty()) {
this.childTasks.add(loadAckWorkTask);
Review comment:
Remove this from here and from next line as well.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -392,7 +439,7 @@ private void createEndReplLogTask(Context context, Scope
scope,
/**
* There was a database update done before and we want to make sure we
update the last repl
* id on this database as we are now going to switch to processing a new
database.
- *
+ * <p>
Review comment:
Remove this.
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -340,30 +387,30 @@ private void dropTablesExcludedInReplScope(ReplScope
replScope) throws HiveExcep
// List all the tables that are excluded in the current repl scope.
Iterable<String> tableNames = Collections2.filter(db.getAllTables(dbName),
tableName -> {
- assert(tableName != null);
+ assert (tableName != null);
return !tableName.toLowerCase().startsWith(
- SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase())
- && !replScope.tableIncludedInReplScope(tableName);
+ SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase())
+ && !replScope.tableIncludedInReplScope(tableName);
});
for (String table : tableNames) {
db.dropTable(dbName + "." + table, true);
}
LOG.info("Tables in the Database: {} that are excluded in the replication
scope are dropped.",
- dbName);
+ dbName);
}
private void createReplLoadCompleteAckTask() {
if ((work.isIncrementalLoad() &&
!work.incrementalLoadTasksBuilder().hasMoreWork() &&
!work.hasBootstrapLoadTasks())
- || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
+ || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
//All repl load tasks are executed and status is 0, create the task to
add the acknowledgement
AckWork replLoadAckWork = new AckWork(
- new Path(work.dumpDirectory, LOAD_ACKNOWLEDGEMENT.toString()));
+ new Path(work.dumpDirectory, LOAD_ACKNOWLEDGEMENT.toString()));
Task<AckWork> loadAckWorkTask = TaskFactory.get(replLoadAckWork, conf);
- if (this.childTasks.isEmpty()) {
+ if (childTasks.isEmpty()) {
this.childTasks.add(loadAckWorkTask);
} else {
DAGTraversal.traverse(this.childTasks,
- new
AddDependencyToLeaves(Collections.singletonList(loadAckWorkTask)));
+ new
AddDependencyToLeaves(Collections.singletonList(loadAckWorkTask)));
Review comment:
You can rather shift this to single line
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -452,78 +499,65 @@ private void createBuilderTask(List<Task<?>> rootTasks) {
DAGTraversal.traverse(rootTasks, new AddDependencyToLeaves(loadTask));
}
- private int executeIncrementalLoad() {
- try {
-
- // If replication policy is changed between previous and current repl
load, then drop the tables
- // that are excluded in the new replication policy.
- dropTablesExcludedInReplScope(work.currentReplScope);
-
- IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder();
-
- // If incremental events are already applied, then check and perform if
need to bootstrap any tables.
- if (!builder.hasMoreWork() && work.isLastReplIDUpdated()) {
- if (work.hasBootstrapLoadTasks()) {
- LOG.debug("Current incremental dump have tables to be bootstrapped.
Switching to bootstrap "
- + "mode after applying all events.");
- return executeBootStrapLoad();
- }
+ private int executeIncrementalLoad() throws Exception {
+ // If replication policy is changed between previous and current repl
load, then drop the tables
+ // that are excluded in the new replication policy.
+ dropTablesExcludedInReplScope(work.currentReplScope);
+ IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder();
+ // If incremental events are already applied, then check and perform if
need to bootstrap any tables.
+ if (!builder.hasMoreWork() && work.isLastReplIDUpdated()) {
+ if (work.hasBootstrapLoadTasks()) {
+ LOG.debug("Current incremental dump have tables to be bootstrapped.
Switching to bootstrap "
+ + "mode after applying all events.");
+ return executeBootStrapLoad();
}
-
- List<Task<?>> childTasks = new ArrayList<>();
- int maxTasks =
conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
-
- TaskTracker tracker = new TaskTracker(maxTasks);
- childTasks.add(builder.build(context, getHive(), LOG, tracker));
-
- // If there are no more events to be applied, add a task to update the
last.repl.id of the
- // target database to the event id of the last event considered by the
dump. Next
- // incremental cycle won't consider the events in this dump again if it
starts from this id.
- if (!builder.hasMoreWork()) {
- // The name of the database to be loaded into is either specified
directly in REPL LOAD
- // command i.e. when dbNameToLoadIn has a valid dbname or is available
through dump
- // metadata during table level replication.
- String dbName = work.dbNameToLoadIn;
- if (dbName == null || StringUtils.isBlank(dbName)) {
- if (work.currentReplScope != null) {
- String replScopeDbName = work.currentReplScope.getDbName();
- if (replScopeDbName != null && !"*".equals(replScopeDbName)) {
- dbName = replScopeDbName;
- }
+ }
+ List<Task<?>> childTasks = new ArrayList<>();
+ int maxTasks =
conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
+ TaskTracker tracker = new TaskTracker(maxTasks);
+ childTasks.add(builder.build(context, getHive(), LOG, tracker));
+ // If there are no more events to be applied, add a task to update the
last.repl.id of the
+ // target database to the event id of the last event considered by the
dump. Next
+ // incremental cycle won't consider the events in this dump again if it
starts from this id.
+ if (!builder.hasMoreWork()) {
+ // The name of the database to be loaded into is either specified
directly in REPL LOAD
+ // command i.e. when dbNameToLoadIn has a valid dbname or is available
through dump
+ // metadata during table level replication.
+ String dbName = work.dbNameToLoadIn;
+ if (dbName == null || StringUtils.isBlank(dbName)) {
+ if (work.currentReplScope != null) {
+ String replScopeDbName = work.currentReplScope.getDbName();
+ if (replScopeDbName != null && !"*".equals(replScopeDbName)) {
+ dbName = replScopeDbName;
}
}
-
- // If we are replicating to multiple databases at a time, it's not
- // possible to know which all databases we are replicating into and
hence we can not
- // update repl id in all those databases.
- if (StringUtils.isNotBlank(dbName)) {
- String lastEventid = builder.eventTo().toString();
- Map<String, String> mapProp = new HashMap<>();
- mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(),
lastEventid);
-
- AlterDatabaseSetPropertiesDesc alterDbDesc =
- new AlterDatabaseSetPropertiesDesc(dbName, mapProp,
- new ReplicationSpec(lastEventid, lastEventid));
- Task<?> updateReplIdTask =
- TaskFactory.get(new DDLWork(new HashSet<>(), new
HashSet<>(), alterDbDesc), conf);
-
- DAGTraversal.traverse(childTasks, new
AddDependencyToLeaves(updateReplIdTask));
- work.setLastReplIDUpdated(true);
- LOG.debug("Added task to set last repl id of db " + dbName + " to "
+ lastEventid);
- }
}
-
- // Once all the incremental events are applied, enable bootstrap of
tables if exist.
- if (builder.hasMoreWork() || work.hasBootstrapLoadTasks()) {
- DAGTraversal.traverse(childTasks, new
AddDependencyToLeaves(TaskFactory.get(work, conf)));
+ // If we are replicating to multiple databases at a time, it's not
+ // possible to know which all databases we are replicating into and
hence we can not
+ // update repl id in all those databases.
+ if (StringUtils.isNotBlank(dbName)) {
+ String lastEventid = builder.eventTo().toString();
+ Map<String, String> mapProp = new HashMap<>();
+ mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), lastEventid);
+ AlterDatabaseSetPropertiesDesc alterDbDesc =
+ new AlterDatabaseSetPropertiesDesc(dbName, mapProp,
+ new ReplicationSpec(lastEventid, lastEventid));
+ Task<?> updateReplIdTask =
+ TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(),
alterDbDesc), conf);
+ DAGTraversal.traverse(childTasks, new
AddDependencyToLeaves(updateReplIdTask));
+ work.setLastReplIDUpdated(true);
+ LOG.debug("Added task to set last repl id of db " + dbName + " to " +
lastEventid);
}
- this.childTasks = childTasks;
- createReplLoadCompleteAckTask();
- return 0;
- } catch (Exception e) {
- LOG.error("failed replication", e);
- setException(e);
- return 1;
}
+ // Once all the incremental events are applied, enable bootstrap of tables
if exist.
+ if (builder.hasMoreWork() || work.hasBootstrapLoadTasks()) {
+ DAGTraversal.traverse(childTasks, new
AddDependencyToLeaves(TaskFactory.get(work, conf)));
+ }
+ if (this.childTasks == null) {
Review comment:
remove this.
##########
File path:
ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerExportPolicyList;
+import org.apache.hadoop.hive.ql.exec.repl.ranger.RangerRestClientImpl;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT;
+
+/**
+ * Unit test class for testing Ranger Dump.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestRangerLoadTask {
+
+ protected static final Logger LOG =
LoggerFactory.getLogger(TestRangerLoadTask.class);
+ private RangerLoadTask task;
+
+ @Mock
+ private RangerRestClientImpl mockClient;
+
+ @Mock
+ private HiveConf conf;
+
+ @Mock
+ private RangerLoadWork work;
+
+ @Before
+ public void setup() throws Exception {
+ task = new RangerLoadTask(mockClient, conf, work);
+ Mockito.when(mockClient.changeDataSet(Mockito.anyList(),
Mockito.anyString(), Mockito.anyString()))
+ .thenCallRealMethod();
+
Mockito.when(mockClient.checkConnection(Mockito.anyString())).thenReturn(true);
+ }
+
+ @Test
+ public void testFailureInvalidAuthProviderEndpoint() {
+
Mockito.when(conf.getVar(REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT)).thenReturn(null);
+ int status = task.execute();
+ Assert.assertEquals(40000, status);
+ }
+
+ @Test
+ public void testSuccessValidAuthProviderEndpoint() {
+
Mockito.when(conf.getVar(REPL_AUTHORIZATION_PROVIDER_SERVICE_ENDPOINT)).thenReturn("rangerEndpoint");
+ Mockito.when(work.getSourceDbName()).thenReturn("srcdb");
+ Mockito.when(work.getTargetDbName()).thenReturn("tgtdb");
+ int status = task.execute();
+ Assert.assertEquals(0, status);
+ }
+
+ @Test
+ public void testSuccessNonEmptyRangerPolicies() throws Exception {
+ String rangerResponse = "{\"metaDataInfo\":{\"Host
name\":\"org.apache.ranger.com\","
Review comment:
ranger.apache.org?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 432643)
Time Spent: 2h 50m (was: 2h 40m)
> Ranger Replication Scheduling
> -----------------------------
>
> Key: HIVE-23351
> URL: https://issues.apache.org/jira/browse/HIVE-23351
> Project: Hive
> Issue Type: Task
> Reporter: Aasha Medhi
> Assignee: Aasha Medhi
> Priority: Major
> Labels: pull-request-available
> Attachments: HIVE-23351.01.patch, HIVE-23351.02.patch,
> HIVE-23351.03.patch, HIVE-23351.04.patch, HIVE-23351.05.patch,
> HIVE-23351.06.patch, HIVE-23351.07.patch, HIVE-23351.08.patch,
> HIVE-23351.09.patch, HIVE-23351.10.patch, HIVE-23351.10.patch,
> HIVE-23351.11.patch
>
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)