This is an automated email from the ASF dual-hosted git repository. anishek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 1c1336f HIVE-23353: Atlas metadata replication scheduling (Pravin Kumar Sinha, reviewed by Aasha Medhi) 1c1336f is described below commit 1c1336f712c9a9a492c3c4e32993758026630484 Author: Anishek Agarwal <anis...@gmail.com> AuthorDate: Fri May 29 15:38:17 2020 +0530 HIVE-23353: Atlas metadata replication scheduling (Pravin Kumar Sinha, reviewed by Aasha Medhi) --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 10 + .../TestReplicationScenariosAcrossInstances.java | 145 +++++++++++++++ pom.xml | 2 + ql/if/queryplan.thrift | 4 +- ql/pom.xml | 50 +++++ ql/src/gen/thrift/gen-cpp/queryplan_types.cpp | 10 +- ql/src/gen/thrift/gen-cpp/queryplan_types.h | 4 +- .../apache/hadoop/hive/ql/plan/api/StageType.java | 8 +- ql/src/gen/thrift/gen-php/Types.php | 4 + ql/src/gen/thrift/gen-py/queryplan/ttypes.py | 6 + ql/src/gen/thrift/gen-rb/queryplan_types.rb | 6 +- .../apache/hadoop/hive/ql/exec/TaskFactory.java | 6 + .../hadoop/hive/ql/exec/repl/AtlasDumpTask.java | 204 +++++++++++++++++++++ .../hadoop/hive/ql/exec/repl/AtlasDumpWork.java | 61 ++++++ .../hadoop/hive/ql/exec/repl/AtlasLoadTask.java | 137 ++++++++++++++ .../hadoop/hive/ql/exec/repl/AtlasLoadWork.java | 58 ++++++ .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 19 ++ .../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 20 +- .../hive/ql/exec/repl/atlas/AtlasReplInfo.java | 101 ++++++++++ .../ql/exec/repl/atlas/AtlasRequestBuilder.java | 192 +++++++++++++++++++ .../hive/ql/exec/repl/atlas/AtlasRestClient.java | 43 +++++ .../ql/exec/repl/atlas/AtlasRestClientBuilder.java | 97 ++++++++++ .../ql/exec/repl/atlas/AtlasRestClientImpl.java | 175 ++++++++++++++++++ .../ql/exec/repl/atlas/NoOpAtlasRestClient.java | 57 ++++++ .../hive/ql/exec/repl/atlas/RetryingClient.java | 92 ++++++++++ .../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 23 +++ .../hadoop/hive/ql/parse/repl/dump/Utils.java | 31 ++++ 27 files changed, 1556 insertions(+), 9 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 281c4e2..abd12c9 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -539,6 +539,16 @@ public class HiveConf extends Configuration { true, "This configuration will add a deny policy on the target database for all users except hive" + " to avoid any update to the target database"), + REPL_INCLUDE_ATLAS_METADATA("hive.repl.include.atlas.metadata", false, + "Indicates if Atlas metadata should be replicated along with Hive data and metadata or not."), + REPL_ATLAS_ENDPOINT("hive.repl.atlas.endpoint", null, + "Atlas endpoint of the current cluster hive database is getting replicated from/to."), + REPL_ATLAS_REPLICATED_TO_DB("hive.repl.atlas.replicatedto", null, + "Target hive database name Atlas metadata of source hive database is being replicated to."), + REPL_SOURCE_CLUSTER_NAME("hive.repl.source.cluster.name", null, + "Name of the source cluster for the replication."), + REPL_TARGET_CLUSTER_NAME("hive.repl.target.cluster.name", null, + "Name of the target cluster for the replication."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 65f7303..d7b360c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -43,11 +43,18 @@ import org.junit.BeforeClass; import org.junit.Test; import javax.annotation.Nullable; +import java.io.BufferedReader; import java.io.File; import java.io.IOException; +import java.io.InputStreamReader; +import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -1538,4 +1545,142 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro assertEquals("Authorizer sentry not supported for replication ", e.getMessage()); } } + + //Testing just the configs and no impact on existing replication + @Test + public void testAtlasReplication() throws Throwable { + Map<String, String> confMap = defaultAtlasConfMap(); + primary.run("use " + primaryDbName) + .run("create table acid_table (key int, value int) partitioned by (load_date date) " + + "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')") + .run("create table table1 (i String)") + .run("insert into table1 values (1)") + .run("insert into table1 values (2)") + .dump(primaryDbName, getAtlasClause(defaultAtlasConfMap())); + verifyAtlasMetadataPresent(); + + confMap.remove("hive.repl.atlas.replicatedto"); + replica.load(replicatedDbName, primaryDbName, getAtlasClause(confMap)) + .run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"acid_table", "table1"}) + .run("select * from table1") + .verifyResults(new String[] {"1", "2"}); + } + + @Test + public void testAtlasMissingConfigs() throws Throwable { + primary.run("use " + primaryDbName) + .run("create table acid_table (key int, value int) partitioned by (load_date date) " + + "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')") + .run("create table table1 (i String)") + .run("insert into table1 values (1)") + .run("insert into table1 values (2)"); + Map<String, String> confMap = new HashMap<>(); + confMap.put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true"); + confMap.put(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA.varname, "true"); + ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, true); + confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "InvalidURL:atlas"); + ensureInvalidUrl(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, true); + confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "http://localhost:21000/atlas"); + ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, true); + confMap.put(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, replicatedDbName); + ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, true); + confMap.put(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, "cluster0"); + ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, true); + confMap.put(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, "cluster1"); + primary.dump(primaryDbName, getAtlasClause(confMap)); + verifyAtlasMetadataPresent(); + + confMap.clear(); + confMap.put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true"); + confMap.put(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA.varname, "true"); + ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, false); + confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "InvalidURL:atlas"); + ensureInvalidUrl(getAtlasClause(confMap), HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, false); + confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "http://localhost:21000/atlas"); + ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, false); + confMap.put(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, "cluster0"); + ensureFailedReplOperation(getAtlasClause(confMap), HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, false); + confMap.put(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, "cluster1"); + primary.load(replicatedDbName, primaryDbName, getAtlasClause(confMap)); + } + + private void ensureInvalidUrl(List<String> atlasClause, String endpoint, boolean dump) throws Throwable { + try { + if (dump) { + primary.dump(primaryDbName, atlasClause); + } else { + primary.load(replicatedDbName, primaryDbName, atlasClause); + } + } catch (MalformedURLException e) { + return; + } + Assert.fail("Atlas endpoint is invalid and but test didn't fail:" + endpoint); + } + + private void verifyAtlasMetadataPresent() throws IOException { + Path dbReplDir = new Path(primary.repldDir, + Base64.getEncoder().encodeToString(primaryDbName.toLowerCase().getBytes(StandardCharsets.UTF_8.name()))); + FileSystem fs = FileSystem.get(dbReplDir.toUri(), primary.getConf()); + assertTrue(fs.exists(dbReplDir)); + FileStatus[] dumpRoots = fs.listStatus(dbReplDir); + assert(dumpRoots.length == 1); + Path dumpRoot = dumpRoots[0].getPath(); + assertTrue("Hive dump root doesn't exist", fs.exists(new Path(dumpRoot, ReplUtils.REPL_HIVE_BASE_DIR))); + Path atlasDumpRoot = new Path(dumpRoot, ReplUtils.REPL_ATLAS_BASE_DIR); + assertTrue("Atlas dump root doesn't exist", fs.exists(atlasDumpRoot)); + assertTrue("Atlas export file doesn't exist", + fs.exists(new Path(atlasDumpRoot, ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME))); + assertTrue("Atlas dump metadata doesn't exist", + fs.exists(new Path(atlasDumpRoot, EximUtil.METADATA_NAME))); + BufferedReader br = null; + try { + br = new BufferedReader(new InputStreamReader( + fs.open(new Path(atlasDumpRoot, EximUtil.METADATA_NAME)), Charset.defaultCharset())); + String[] lineContents = br.readLine().split("\t", 5); + assertEquals(primary.hiveConf.get("fs.defaultFS"), lineContents[0]); + assertEquals(0, Long.parseLong(lineContents[1])); + } finally { + if (br != null) { + br.close(); + } + } + } + + private void ensureFailedReplOperation(List<String> clause, String conf, boolean dump) throws Throwable { + try { + if (dump) { + primary.dump(primaryDbName, clause); + } else { + primary.load(replicatedDbName, primaryDbName, clause); + } + Assert.fail(conf + " is mandatory config for Atlas metadata replication but it didn't fail."); + } catch (SemanticException e) { + assertEquals(e.getMessage(), (conf + " is mandatory config for Atlas metadata replication")); + } + } + + private Map<String, String> defaultAtlasConfMap() { + Map<String, String> confMap = new HashMap<>(); + confMap.put(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true"); + confMap.put(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA.varname, "true"); + confMap.put(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, "http://localhost:21000/atlas"); + confMap.put(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, replicatedDbName); + confMap.put(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, "cluster0"); + confMap.put(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, "cluster1"); + return confMap; + } + + private List<String> getAtlasClause(Map<String, String> confMap) { + List confList = new ArrayList(); + for (Map.Entry<String, String> entry:confMap.entrySet()) { + confList.add(quote(entry.getKey()) + "=" + quote(entry.getValue())); + } + return confList; + } + + private String quote(String str) { + return "'" + str + "'"; + } } diff --git a/pom.xml b/pom.xml index b4b41ea..cedcb2e 100644 --- a/pom.xml +++ b/pom.xml @@ -112,6 +112,7 @@ <apache-directory-server.version>1.5.7</apache-directory-server.version> <!-- Include arrow for LlapOutputFormatService --> <arrow.version>0.10.0</arrow.version> + <atlas.client.version>2.0.0</atlas.client.version> <avatica.version>1.12.0</avatica.version> <avro.version>1.8.2</avro.version> <calcite.version>1.21.0</calcite.version> @@ -123,6 +124,7 @@ <commons-codec.version>1.7</commons-codec.version> <commons-collections.version>3.2.2</commons-collections.version> <commons-compress.version>1.19</commons-compress.version> + <commons-configuration.version>1.10</commons-configuration.version> <commons-exec.version>1.1</commons-exec.version> <commons-io.version>2.6</commons-io.version> <commons-lang3.version>3.9</commons-lang3.version> diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift index d7d7cdc..91bff61 100644 --- a/ql/if/queryplan.thrift +++ b/ql/if/queryplan.thrift @@ -109,7 +109,9 @@ enum StageType { SCHEDULED_QUERY_MAINT, ACK, RANGER_DUMP, - RANGER_LOAD + RANGER_LOAD, + ATLAS_DUMP, + ATLAS_LOAD } struct Stage { diff --git a/ql/pom.xml b/ql/pom.xml index 816b400..689d2d1 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -38,6 +38,56 @@ <!-- intra-project --> <!-- used for vector code-gen --> <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-client-v2</artifactId> + <version>${atlas.client.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>jul-to-slf4j</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-client-common</artifactId> + <version>${atlas.client.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>jul-to-slf4j</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-intg</artifactId> + <version>${atlas.client.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>jul-to-slf4j</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>commons-configuration</groupId> + <artifactId>commons-configuration</artifactId> + <version>${commons-configuration.version}</version> + </dependency> + <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-vector-code-gen</artifactId> <version>${project.version}</version> diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp index 00847d5..c7338e1 100644 --- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp +++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp @@ -126,7 +126,9 @@ int _kStageTypeValues[] = { StageType::SCHEDULED_QUERY_MAINT, StageType::ACK, StageType::RANGER_DUMP, - StageType::RANGER_LOAD + StageType::RANGER_LOAD, + StageType::ATLAS_DUMP, + StageType::ATLAS_LOAD }; const char* _kStageTypeNames[] = { "CONDITIONAL", @@ -149,9 +151,11 @@ const char* _kStageTypeNames[] = { "SCHEDULED_QUERY_MAINT", "ACK", "RANGER_DUMP", - "RANGER_LOAD" + "RANGER_LOAD", + "ATLAS_DUMP", + "ATLAS_LOAD }; -const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(21, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(23, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); Adjacency::~Adjacency() throw() { diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h b/ql/src/gen/thrift/gen-cpp/queryplan_types.h index b3d9cf7..1e8c702 100644 --- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h +++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h @@ -103,7 +103,9 @@ struct StageType { SCHEDULED_QUERY_MAINT = 17, ACK = 18, RANGER_DUMP = 19, - RANGER_LOAD = 20 + RANGER_LOAD = 20, + ATLAS_DUMP = 21, + ATLAS_LOAD = 22 }; }; diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java index 31c3429..eba0230 100644 --- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java +++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java @@ -32,7 +32,9 @@ public enum StageType implements org.apache.thrift.TEnum { SCHEDULED_QUERY_MAINT(17), ACK(18), RANGER_DUMP(19), - RANGER_LOAD(20); + RANGER_LOAD(20), + ATLAS_DUMP(21), + ATLAS_LOAD(22); private final int value; @@ -95,6 +97,10 @@ public enum StageType implements org.apache.thrift.TEnum { return RANGER_DUMP; case 20: return RANGER_LOAD; + case 21: + return ATLAS_DUMP; + case 22: + return ATLAS_LOAD; default: return null; } diff --git a/ql/src/gen/thrift/gen-php/Types.php b/ql/src/gen/thrift/gen-php/Types.php index d805d21..7cb3bea 100644 --- a/ql/src/gen/thrift/gen-php/Types.php +++ b/ql/src/gen/thrift/gen-php/Types.php @@ -125,6 +125,8 @@ final class StageType { const ACK = 18; const RANGER_DUMP = 19; const RANGER_LOAD = 20; + const ATLAS_DUMP = 21; + const ATLAS_LOAD = 22; static public $__names = array( 0 => 'CONDITIONAL', 1 => 'COPY', @@ -147,6 +149,8 @@ final class StageType { 18 => 'ACK', 19 => 'RANGER_DUMP', 20 => 'RANGER_LOAD', + 21 => 'ATLAS_DUMP', + 22 => 'ATLAS_LOAD', ); } diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py index 33a53ce..39a20c3 100644 --- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py +++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py @@ -172,6 +172,8 @@ class StageType: ACK = 18 RANGER_DUMP = 19 RANGER_LOAD = 20 + ATLAS_DUMP = 21 + ATLAS_LOAD = 22 _VALUES_TO_NAMES = { 0: "CONDITIONAL", @@ -195,6 +197,8 @@ class StageType: 18: "ACK", 19: "RANGER_DUMP", 20: "RANGER_LOAD", + 21: "ATLAS_DUMP", + 22: "ATLAS_LOAD", } _NAMES_TO_VALUES = { @@ -219,6 +223,8 @@ class StageType: "ACK": 18, "RANGER_DUMP": 19, "RANGER_LOAD": 20, + "ATLAS_DUMP": 21, + "ATLAS_LOAD": 22, } diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb index d31ba3c..0e827b2 100644 --- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb +++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb @@ -82,8 +82,10 @@ module StageType ACK = 18 RANGER_DUMP = 19 RANGER_LOAD = 20 - VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG", 15 => "REPL_TXN", 16 => "REPL_INCREMENTAL_LOAD", 17 => "SCHEDULED_QUERY_MAINT", 18 => "ACK", 19 => "RANGER_DUMP", 20 => "RANGER_LOAD"} - VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, REPL_TXN, REPL_INCREMENTAL_LOAD, SCHEDULED_QUERY_MAINT, ACK, RANGER_DUMP, RANGER_LOAD]).freeze + ATLAS_DUMP = 21 + ATLAS_LOAD = 22 + VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG", 15 => "REPL_TXN", 16 => "REPL_INCREMENTAL_LOAD", 17 => "SCHEDULED_QUERY_MAINT", 18 => "ACK", 19 => "RANGER_DUMP", 20 => "RANGER_LOAD", 21 => "ATLAS_DUMP", 22 => "ATLAS_LOAD"} + VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, REPL_TXN, REPL_INCREMENTAL_LOAD, SCHEDULED_QUERY_MAINT, ACK, RANGER_DUMP, RANGER_LOAD, ATLAS_DUMP, ATLAS_LOAD]).freeze end class Adjacency diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index f0e5461..fd2715d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -27,6 +27,10 @@ import org.apache.hadoop.hive.ql.ddl.DDLTask; import org.apache.hadoop.hive.ql.ddl.DDLWork; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask; +import org.apache.hadoop.hive.ql.exec.repl.AtlasDumpTask; +import org.apache.hadoop.hive.ql.exec.repl.AtlasDumpWork; +import org.apache.hadoop.hive.ql.exec.repl.AtlasLoadTask; +import org.apache.hadoop.hive.ql.exec.repl.AtlasLoadWork; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; import org.apache.hadoop.hive.ql.exec.repl.ReplLoadTask; @@ -120,6 +124,8 @@ public final class TaskFactory { taskvec.add(new TaskTuple<AckWork>(AckWork.class, AckTask.class)); taskvec.add(new TaskTuple<RangerDumpWork>(RangerDumpWork.class, RangerDumpTask.class)); taskvec.add(new TaskTuple<RangerLoadWork>(RangerLoadWork.class, RangerLoadTask.class)); + taskvec.add(new TaskTuple<AtlasDumpWork>(AtlasDumpWork.class, AtlasDumpTask.class)); + taskvec.add(new TaskTuple<AtlasLoadWork>(AtlasLoadWork.class, AtlasLoadTask.class)); taskvec.add(new TaskTuple<ExportWork>(ExportWork.class, ExportTask.class)); taskvec.add(new TaskTuple<ReplTxnWork>(ReplTxnWork.class, ReplTxnTask.class)); taskvec.add(new TaskTuple<DirCopyWork>(DirCopyWork.class, DirCopyTask.class)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java new file mode 100644 index 0000000..26cdc6b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.impexp.AtlasServer; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo; +import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder; +import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient; +import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; +import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Atlas Metadata Replication Dump Task. + **/ +public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable { + + private static final transient Logger LOG = LoggerFactory.getLogger(AtlasDumpTask.class); + private static final long serialVersionUID = 1L; + private transient AtlasRestClient atlasRestClient; + + @Override + public int execute() { + try { + AtlasReplInfo atlasReplInfo = createAtlasReplInfo(); + LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging location:", + atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir()); + atlasRestClient = new AtlasRestClientBuilder(atlasReplInfo.getAtlasEndpoint()) + .getClient(atlasReplInfo.getConf()); + AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder(); + String entityGuid = checkHiveEntityGuid(atlasRequestBuilder, atlasReplInfo.getSrcCluster(), + atlasReplInfo.getSrcDB()); + long currentModifiedTime = getCurrentTimestamp(atlasReplInfo, entityGuid); + dumpAtlasMetaData(atlasRequestBuilder, atlasReplInfo); + createDumpMetadata(atlasReplInfo, currentModifiedTime); + return 0; + } catch (Exception e) { + LOG.error("Exception while dumping atlas metadata", e); + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } + } + + private AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException { + String errorFormat = "%s is mandatory config for Atlas metadata replication"; + //Also validates URL for endpoint. + String endpoint = new URL(ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, conf, errorFormat)) + .toString(); + String tgtDB = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname, conf, errorFormat); + String srcCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, conf, errorFormat); + String tgtCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, conf, errorFormat); + AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, work.getSrcDB(), tgtDB, srcCluster, + tgtCluster, work.getStagingDir(), conf); + atlasReplInfo.setSrcFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG)); + long lastTimeStamp = work.isBootstrap() ? 0L : lastStoredTimeStamp(); + atlasReplInfo.setTimeStamp(lastTimeStamp); + return atlasReplInfo; + } + + private long lastStoredTimeStamp() throws SemanticException { + Path prevMetadataPath = new Path(work.getPrevAtlasDumpDir(), EximUtil.METADATA_NAME); + BufferedReader br = null; + try { + FileSystem fs = prevMetadataPath.getFileSystem(conf); + br = new BufferedReader(new InputStreamReader(fs.open(prevMetadataPath), Charset.defaultCharset())); + String line = br.readLine(); + if (line == null) { + throw new SemanticException("Could not read lastStoredTimeStamp from atlas metadata file"); + } + String[] lineContents = line.split("\t", 5); + return Long.parseLong(lineContents[1]); + } catch (Exception ex) { + throw new SemanticException(ex); + } finally { + if (br != null) { + try { + br.close(); + } catch (IOException e) { + throw new SemanticException(e); + } + } + } + } + + private long getCurrentTimestamp(AtlasReplInfo atlasReplInfo, String entityGuid) throws SemanticException { + AtlasServer atlasServer = atlasRestClient.getServer(atlasReplInfo.getSrcCluster()); + long ret = (atlasServer == null || atlasServer.getAdditionalInfoRepl(entityGuid) == null) + ? 0L : (long) atlasServer.getAdditionalInfoRepl(entityGuid); + LOG.debug("Current timestamp is: {}", ret); + return ret; + } + + private void dumpAtlasMetaData(AtlasRequestBuilder atlasRequestBuilder, AtlasReplInfo atlasReplInfo) + throws SemanticException { + InputStream inputStream = null; + try { + AtlasExportRequest exportRequest = atlasRequestBuilder.createExportRequest(atlasReplInfo, + atlasReplInfo.getSrcCluster()); + inputStream = atlasRestClient.exportData(exportRequest); + FileSystem fs = FileSystem.get(atlasReplInfo.getStagingDir().toUri(), atlasReplInfo.getConf()); + Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME); + long numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream); + LOG.info("Wrote to {} ({} bytes)", exportFilePath, numBytesWritten); + } catch (SemanticException ex) { + throw ex; + } catch (Exception ex) { + throw new SemanticException(ex); + } finally { + if (inputStream != null) { + try { + inputStream.close(); + } catch (IOException e) { + throw new SemanticException(e); + } + } + } + } + + private String checkHiveEntityGuid(AtlasRequestBuilder atlasRequestBuilder, String clusterName, + String srcDb) + throws SemanticException { + AtlasObjectId objectId = atlasRequestBuilder.getItemToExport(clusterName, srcDb); + Set<Map.Entry<String, Object>> entries = objectId.getUniqueAttributes().entrySet(); + if (entries == null || entries.isEmpty()) { + throw new SemanticException("Could find entries in objectId for:" + clusterName); + } + Map.Entry<String, Object> item = entries.iterator().next(); + String guid = atlasRestClient.getEntityGuid(objectId.getTypeName(), item.getKey(), (String) item.getValue()); + if (guid == null || guid.isEmpty()) { + throw new SemanticException("Entity not found:" + objectId); + } + return guid; + } + + private void createDumpMetadata(AtlasReplInfo atlasReplInfo, long lastModifiedTime) throws SemanticException { + Path dumpFile = new Path(atlasReplInfo.getStagingDir(), EximUtil.METADATA_NAME); + List<List<String>> listValues = new ArrayList<>(); + listValues.add( + Arrays.asList( + atlasReplInfo.getSrcFsUri(), + String.valueOf(lastModifiedTime) + ) + ); + Utils.writeOutput(listValues, dumpFile, conf, true); + LOG.debug("Stored metadata for Atlas dump at:", dumpFile.toString()); + } + + @Override + public StageType getType() { + return StageType.ATLAS_DUMP; + } + + @Override + public String getName() { + return "ATLAS_DUMP"; + } + + @Override + public boolean canExecuteInParallel() { + return false; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java new file mode 100644 index 0000000..3344152 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.plan.Explain; +import org.apache.hadoop.hive.ql.plan.Explain.Level; + +import java.io.Serializable; + +/** + * Atlas metadata replication work. + */ +@Explain(displayName = "Atlas Meta Data Dump Work", explainLevels = {Level.USER, Level.DEFAULT, Level.EXTENDED}) +public class AtlasDumpWork implements Serializable { + private static final long serialVersionUID = 1L; + private final String srcDB; + private final Path stagingDir; + private final boolean bootstrap; + private final Path prevAtlasDumpDir; + + + public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path prevAtlasDumpDir) { + this.srcDB = srcDB; + this.stagingDir = stagingDir; + this.bootstrap = bootstrap; + this.prevAtlasDumpDir = prevAtlasDumpDir; + } + + public boolean isBootstrap() { + return bootstrap; + } + + public Path getPrevAtlasDumpDir() { + return prevAtlasDumpDir; + } + + public String getSrcDB() { + return srcDB; + } + + public Path getStagingDir() { + return stagingDir; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java new file mode 100644 index 0000000..fceded5 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo; +import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder; +import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.Charset; + +/** + * Atlas Metadata Replication Load Task. + **/ +public class AtlasLoadTask extends Task<AtlasLoadWork> implements Serializable { + private static final long serialVersionUID = 1L; + private static final transient Logger LOG = LoggerFactory.getLogger(AtlasLoadTask.class); + + @Override + public int execute() { + try { + AtlasReplInfo atlasReplInfo = createAtlasReplInfo(); + LOG.info("Loading atlas metadata from srcDb: {} to tgtDb: {} from staging: {}", + atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir()); + int importCount = importAtlasMetadata(atlasReplInfo); + LOG.info("Atlas entities import count {}", importCount); + return 0; + } catch (Exception e) { + LOG.error("Exception while loading atlas metadata", e); + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } + } + + private AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException { + String errorFormat = "%s is mandatory config for Atlas metadata replication"; + //Also validates URL for endpoint. + String endpoint = new URL(ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname, conf, errorFormat)) + .toString(); + String srcCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname, conf, errorFormat); + String tgtCluster = ReplUtils.getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname, conf, errorFormat); + AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, work.getSrcDB(), work.getTgtDB(), + srcCluster, tgtCluster, work.getStagingDir(), conf); + atlasReplInfo.setSrcFsUri(getStoredFsUri(atlasReplInfo.getStagingDir())); + atlasReplInfo.setTgtFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG)); + return atlasReplInfo; + } + + private String getStoredFsUri(Path atlasDumpDir) throws SemanticException { + Path metadataPath = new Path(atlasDumpDir, EximUtil.METADATA_NAME); + BufferedReader br = null; + try { + FileSystem fs = metadataPath.getFileSystem(conf); + br = new BufferedReader(new InputStreamReader(fs.open(metadataPath), Charset.defaultCharset())); + String line = br.readLine(); + if (line == null) { + throw new SemanticException("Could not read stored src FS Uri from atlas metadata file"); + } + String[] lineContents = line.split("\t", 5); + return lineContents[0]; + } catch (Exception ex) { + throw new SemanticException(ex); + } finally { + if (br != null) { + try { + br.close(); + } catch (IOException e) { + throw new SemanticException(e); + } + } + } + } + + private int importAtlasMetadata(AtlasReplInfo atlasReplInfo) throws Exception { + AtlasRequestBuilder atlasRequestBuilder = new AtlasRequestBuilder(); + AtlasImportRequest importRequest = atlasRequestBuilder.createImportRequest( + atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), + atlasReplInfo.getSrcCluster(), atlasReplInfo.getTgtCluster(), + atlasReplInfo.getSrcFsUri(), atlasReplInfo.getTgtFsUri()); + AtlasImportResult result = new AtlasRestClientBuilder(atlasReplInfo.getAtlasEndpoint()) + .getClient(atlasReplInfo.getConf()).importData(importRequest, atlasReplInfo); + if (result == null || result.getProcessedEntities() == null) { + LOG.info("No Atlas entity found"); + return 0; + } + return result.getProcessedEntities().size(); + } + + @Override + public StageType getType() { + return StageType.ATLAS_LOAD; + } + + @Override + public String getName() { + return "ATLAS_LOAD"; + } + + @Override + public boolean canExecuteInParallel() { + return false; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java new file mode 100644 index 0000000..4dc1ea8 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.plan.Explain; +import org.apache.hadoop.hive.ql.plan.Explain.Level; + +import java.io.Serializable; + +/** + * Atlas metadata replication load work. + */ +@Explain(displayName = "Atlas Meta Data Load Work", explainLevels = {Level.USER, Level.DEFAULT, Level.EXTENDED}) +public class AtlasLoadWork implements Serializable { + private static final long serialVersionUID = 1L; + private final String srcDB; + private final String tgtDB; + private final Path stagingDir; + + public AtlasLoadWork(String srcDB, String tgtDB, Path stagingDir) { + this.srcDB = srcDB; + this.tgtDB = tgtDB; + this.stagingDir = stagingDir; + } + + public static long getSerialVersionUID() { + return serialVersionUID; + } + + public String getSrcDB() { + return srcDB; + } + + public String getTgtDB() { + return tgtDB; + } + + public Path getStagingDir() { + return stagingDir; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 516b6f4..046b6a0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -147,6 +147,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { Path currentDumpPath = getCurrentDumpPath(dumpRoot, isBootstrap); Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR); work.setCurrentDumpPath(currentDumpPath); + if (shouldDumpAtlasMetadata()) { + addAtlasDumpTask(isBootstrap, previousValidHiveDumpPath); + LOG.info("Added task to dump atlas metadata."); + } if (shouldDumpAuthorizationMetadata()) { initiateAuthorizationDumpTask(); } @@ -195,6 +199,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_AUTHORIZATION_METADATA); } + private boolean shouldDumpAtlasMetadata() { + return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA); + } + private Path getEncodedDumpRootPath() throws UnsupportedEncodingException { return new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase() @@ -228,6 +236,17 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { } } + private void addAtlasDumpTask(boolean bootstrap, Path prevHiveDumpDir) { + Path atlasDumpDir = new Path(work.getCurrentDumpPath(), ReplUtils.REPL_ATLAS_BASE_DIR); + Path prevAtlasDumpDir = prevHiveDumpDir == null ? null + : new Path(prevHiveDumpDir.getParent(), ReplUtils.REPL_ATLAS_BASE_DIR); + AtlasDumpWork atlasDumpWork = new AtlasDumpWork(work.dbNameOrPattern, atlasDumpDir, bootstrap, prevAtlasDumpDir); + Task<?> atlasDumpTask = TaskFactory.get(atlasDumpWork, conf); + childTasks = new ArrayList<>(); + childTasks.add(atlasDumpTask); + } + + private void finishRemainingTasks() throws SemanticException { Path dumpAckFile = new Path(work.getCurrentDumpPath(), ReplUtils.REPL_HIVE_BASE_DIR + File.separator diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java index 7a30962..792e331 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java @@ -107,6 +107,9 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { } work.setRootTask(this); this.parentTasks = null; + if (shouldLoadAtlasMetadata()) { + addAtlasLoadTask(); + } if (shouldLoadAuthorizationMetadata()) { initiateAuthorizationLoadTask(); } @@ -141,8 +144,23 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { childTasks.add(rangerLoadTask); } else { throw new SemanticException("Authorizer " + conf.getVar(HiveConf.ConfVars.REPL_AUTHORIZATION_PROVIDER_SERVICE) - + " not supported for replication "); + + " not supported for replication "); + } + } + + private void addAtlasLoadTask() throws HiveException { + Path atlasDumpDir = new Path(new Path(work.dumpDirectory).getParent(), ReplUtils.REPL_ATLAS_BASE_DIR); + LOG.info("Adding task to load Atlas metadata from {} ", atlasDumpDir); + AtlasLoadWork atlasLoadWork = new AtlasLoadWork(work.getSourceDbName(), work.dbNameToLoadIn, atlasDumpDir); + Task<?> atlasLoadTask = TaskFactory.get(atlasLoadWork, conf); + if (childTasks == null) { + childTasks = new ArrayList<>(); } + childTasks.add(atlasLoadTask); + } + + private boolean shouldLoadAtlasMetadata() { + return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA); } private int executeBootStrapLoad() throws Exception { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasReplInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasReplInfo.java new file mode 100644 index 0000000..b0923d7 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasReplInfo.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.atlas; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; + +/** + * Atlas metadata replication info holder. + */ +public class AtlasReplInfo { + private final String srcDB; + private final String tgtDB; + private final String srcCluster; + private final String tgtCluster; + private final Path stagingDir; + private final HiveConf conf; + private final String atlasEndpoint; + private String srcFsUri; + private String tgtFsUri; + private long timeStamp; + + public AtlasReplInfo(String atlasEndpoint, String srcDB, String tgtDB, String srcCluster, + String tgtCluster, Path stagingDir, HiveConf conf) { + this.atlasEndpoint = atlasEndpoint; + this.srcDB = srcDB; + this.tgtDB = tgtDB; + this.srcCluster = srcCluster; + this.tgtCluster = tgtCluster; + this.stagingDir = stagingDir; + this.conf = conf; + } + + public String getSrcDB() { + return srcDB; + } + + public String getTgtDB() { + return tgtDB; + } + + public String getSrcCluster() { + return srcCluster; + } + + public String getTgtCluster() { + return tgtCluster; + } + + public Path getStagingDir() { + return stagingDir; + } + + public HiveConf getConf() { + return conf; + } + + public String getAtlasEndpoint() { + return atlasEndpoint; + } + + public String getSrcFsUri() { + return srcFsUri; + } + + public void setSrcFsUri(String srcFsUri) { + this.srcFsUri = srcFsUri; + } + + public String getTgtFsUri() { + return tgtFsUri; + } + + public void setTgtFsUri(String tgtFsUri) { + this.tgtFsUri = tgtFsUri; + } + + public long getTimeStamp() { + return timeStamp; + } + + public void setTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRequestBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRequestBuilder.java new file mode 100644 index 0000000..3c72f8f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRequestBuilder.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.atlas; + +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.impexp.AttributeTransform; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.type.AtlasType; +import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; +import java.util.HashMap; + +/** + * Helper class to create export/import request. + */ +public class AtlasRequestBuilder { + private static final Logger LOG = LoggerFactory.getLogger(AtlasRequestBuilder.class); + public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + static final String ATLAS_TYPE_HIVE_DB = "hive_db"; + static final String ATLAS_TYPE_HIVE_SD = "hive_storagedesc"; + static final String QUALIFIED_NAME_FORMAT = "%s@%s"; + + private static final String ATTRIBUTE_NAME_CLUSTER_NAME = ".clusterName"; + private static final String ATTRIBUTE_NAME_NAME = ".name"; + private static final String ATTRIBUTE_NAME_REPLICATED_TO = "replicatedTo"; + private static final String ATTRIBUTE_NAME_REPLICATED_FROM = "replicatedFrom"; + private static final String ATTRIBUTE_NAME_LOCATION = ".location"; + + private static final String HIVE_DB_CLUSTER_NAME = ATLAS_TYPE_HIVE_DB + ATTRIBUTE_NAME_CLUSTER_NAME; + private static final String HIVE_DB_NAME = ATLAS_TYPE_HIVE_DB + ATTRIBUTE_NAME_NAME; + private static final String HIVE_DB_LOCATION = ATLAS_TYPE_HIVE_DB + ATTRIBUTE_NAME_LOCATION; + private static final String HIVE_SD_LOCATION = ATLAS_TYPE_HIVE_SD + ATTRIBUTE_NAME_LOCATION; + + private static final String TRANSFORM_ENTITY_SCOPE = "__entity"; + private static final String REPLICATED_TAG_NAME = "%s_replicated"; + + public AtlasExportRequest createExportRequest(AtlasReplInfo atlasReplInfo, String srcAtlasServer) { + List<AtlasObjectId> itemsToExport = getItemsToExport(atlasReplInfo, srcAtlasServer); + Map<String, Object> options = getOptions(atlasReplInfo); + return createRequest(itemsToExport, options); + } + + private List<AtlasObjectId> getItemsToExport(AtlasReplInfo atlasReplInfo, String srcAtlasServerName) { + List<AtlasObjectId> atlasObjectIds = new ArrayList<>(); + final String qualifiedName = getQualifiedName(srcAtlasServerName, atlasReplInfo.getSrcDB()); + atlasObjectIds.add(new AtlasObjectId(ATLAS_TYPE_HIVE_DB, ATTRIBUTE_QUALIFIED_NAME, qualifiedName)); + return atlasObjectIds; + } + + private AtlasExportRequest createRequest(final List<AtlasObjectId> itemsToExport, + final Map<String, Object> options) { + AtlasExportRequest request = new AtlasExportRequest() { + { + setItemsToExport(itemsToExport); + setOptions(options); + } + }; + LOG.debug("createRequest: {}" + request); + return request; + } + + private Map<String, Object> getOptions(AtlasReplInfo atlasReplInfo) { + String targetCluster = atlasReplInfo.getTgtCluster(); + Map<String, Object> options = new HashMap<>(); + options.put(AtlasExportRequest.OPTION_FETCH_TYPE, AtlasExportRequest.FETCH_TYPE_INCREMENTAL); + options.put(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER, atlasReplInfo.getTimeStamp()); + options.put(AtlasExportRequest.OPTION_SKIP_LINEAGE, true); + if (targetCluster != null && !targetCluster.isEmpty()) { + options.put(AtlasExportRequest.OPTION_KEY_REPLICATED_TO, targetCluster); + } + return options; + } + + public AtlasObjectId getItemToExport(String srcCluster, String srcDB) { + final String qualifiedName = getQualifiedName(srcCluster, srcDB); + return new AtlasObjectId(ATLAS_TYPE_HIVE_DB, ATTRIBUTE_QUALIFIED_NAME, qualifiedName); + } + + private String getQualifiedName(String clusterName, String srcDb) { + String qualifiedName = String.format(QUALIFIED_NAME_FORMAT, srcDb.toLowerCase(), clusterName); + LOG.debug("Atlas getQualifiedName: {}", qualifiedName); + return qualifiedName; + } + + public AtlasImportRequest createImportRequest(String sourceDataSet, String targetDataSet, + String sourceClusterName, String targetClusterName, + String sourceFsEndpoint, String targetFsEndpoint) { + AtlasImportRequest request = new AtlasImportRequest(); + addTransforms(request.getOptions(), + sourceClusterName, targetClusterName, + sourceDataSet, targetDataSet, + sourceFsEndpoint, targetFsEndpoint); + addReplicatedFrom(request.getOptions(), sourceClusterName); + LOG.debug("Atlas metadata import request: {}" + request); + return request; + } + + private void addTransforms(Map<String, String> options, String srcClusterName, + String tgtClusterName, String sourceDataSet, String targetDataSet, + String sourcefsEndpoint, String targetFsEndpoint) { + List<AttributeTransform> transforms = new ArrayList<>(); + String sanitizedSourceClusterName = sanitizeForClassificationName(srcClusterName); + addClassificationTransform(transforms, + String.format(REPLICATED_TAG_NAME, sanitizedSourceClusterName)); + addClearReplicationAttributesTransform(transforms); + addClusterRenameTransform(transforms, srcClusterName, tgtClusterName); + if (!sourceDataSet.equals(targetDataSet)) { + addDataSetRenameTransform(transforms, sourceDataSet, targetDataSet); + } + addLocationTransform(transforms, sourcefsEndpoint, targetFsEndpoint); + options.put(AtlasImportRequest.TRANSFORMERS_KEY, AtlasType.toJson(transforms)); + } + + private void addLocationTransform(List<AttributeTransform> transforms, String srcFsUri, String tgtFsUri) { + transforms.add(create( + HIVE_DB_LOCATION, "STARTS_WITH_IGNORE_CASE: " + srcFsUri, + HIVE_DB_LOCATION, "REPLACE_PREFIX: = :" + srcFsUri + "=" + tgtFsUri + ) + ); + transforms.add(create( + HIVE_SD_LOCATION, "STARTS_WITH_IGNORE_CASE: " + srcFsUri, + HIVE_SD_LOCATION, "REPLACE_PREFIX: = :" + srcFsUri + "=" + tgtFsUri + ) + ); + } + + private void addDataSetRenameTransform(List<AttributeTransform> transforms, + String sourceDataSet, String targetDataSet) { + transforms.add(create( + HIVE_DB_NAME, "EQUALS: " + sourceDataSet, + HIVE_DB_NAME, "SET: " + targetDataSet)); + } + + private void addClusterRenameTransform(List<AttributeTransform> transforms, + String srcClusterName, String tgtClustername) { + transforms.add(create(HIVE_DB_CLUSTER_NAME, "EQUALS: " + srcClusterName, + HIVE_DB_CLUSTER_NAME, "SET: " + tgtClustername)); + } + + private void addReplicatedFrom(Map<String, String> options, String sourceClusterName) { + options.put(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM, sourceClusterName); + } + + private void addClassificationTransform(List<AttributeTransform> transforms, String classificationName) { + transforms.add(create("__entity", "topLevel: ", + "__entity", "ADD_CLASSIFICATION: " + classificationName)); + } + + private String sanitizeForClassificationName(String s) { + if (StringUtils.isEmpty(s)) { + return s; + } + return s.replace('-', '_').replace(' ', '_'); + } + + private void addClearReplicationAttributesTransform(List<AttributeTransform> transforms) { + Map<String, String> actions = new HashMap<>(); + actions.put(TRANSFORM_ENTITY_SCOPE + "." + ATTRIBUTE_NAME_REPLICATED_TO, "CLEAR:"); + actions.put(TRANSFORM_ENTITY_SCOPE + "." + ATTRIBUTE_NAME_REPLICATED_FROM, "CLEAR:"); + + transforms.add(new AttributeTransform(null, actions)); + } + + private AttributeTransform create(String conditionLhs, String conditionRhs, + String actionLhs, String actionRhs) { + return new AttributeTransform(Collections.singletonMap(conditionLhs, conditionRhs), + Collections.singletonMap(actionLhs, actionRhs)); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java new file mode 100644 index 0000000..dd72f83 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClient.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.atlas; + +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.impexp.AtlasServer; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import java.io.InputStream; + +/** + * Atlas RESTClient interface for Atlas' REST APIs. + */ +public interface AtlasRestClient { + + InputStream exportData(AtlasExportRequest request) throws Exception; + + AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo atlasReplInfo) throws Exception; + + AtlasServer getServer(String endpoint) throws SemanticException; + + String getEntityGuid(final String entityType, final String attributeName, final String qualifiedName) + throws SemanticException; + + boolean getStatus() throws SemanticException; +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java new file mode 100644 index 0000000..37b623f --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.atlas; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasException; +import org.apache.commons.configuration.ConfigurationConverter; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +/** + * Builder for AtlasRestClient. + */ +public class AtlasRestClientBuilder { + private static final Logger LOG = LoggerFactory.getLogger(AtlasRestClientBuilder.class); + private static final String ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY = "atlas.client.ha.retries"; + private static final String ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = "atlas.client.ha.sleep.interval.ms"; + private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address"; + private static final String ATLAS_PROPERTY_AUTH_KERBEROS = "atlas.authentication.method.kerberos"; + private static final String URL_SEPERATOR = ","; + + private UserGroupInformation userGroupInformation; + protected String incomingUrl; + protected String[] baseUrls; + + public AtlasRestClientBuilder(String urls) { + this.incomingUrl = urls; + if (urls.contains(URL_SEPERATOR)) { + this.baseUrls = urls.split(URL_SEPERATOR); + } else { + this.baseUrls = new String[]{urls}; + } + } + + public AtlasRestClient getClient(HiveConf conf) throws SemanticException { + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)) { + return new NoOpAtlasRestClient(); + } + return create(); + } + + private AtlasRestClient create() throws SemanticException { + if (baseUrls == null || baseUrls.length == 0) { + throw new SemanticException("baseUrls is not set."); + } + setUGInfo(); + initializeAtlasApplicationProperties(); + AtlasClientV2 clientV2 = new AtlasClientV2(this.userGroupInformation, + this.userGroupInformation.getShortUserName(), baseUrls); + return new AtlasRestClientImpl(clientV2); + } + + private AtlasRestClientBuilder setUGInfo() throws SemanticException { + try { + this.userGroupInformation = UserGroupInformation.getLoginUser(); + LOG.info("AuthStrategy: Kerberos : urls: {} : userGroupInformation: {}", baseUrls, userGroupInformation); + } catch (Exception e) { + throw new SemanticException("Error: setAuthStrategy: UserGroupInformation.getLoginUser: failed!", e); + } + return this; + } + + private void initializeAtlasApplicationProperties() throws SemanticException { + try { + Properties props = new Properties(); + props.setProperty(ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY, "1"); + props.setProperty(ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY, "0"); + props.setProperty(ATLAS_PROPERTY_REST_ADDRESS, incomingUrl); + props.setProperty(ATLAS_PROPERTY_AUTH_KERBEROS, "true"); + ApplicationProperties.set(ConfigurationConverter.getConfiguration(props)); + } catch (AtlasException e) { + throw new SemanticException(e); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java new file mode 100644 index 0000000..c8d738e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.atlas; + +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.impexp.AtlasServer; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static com.sun.jersey.api.client.ClientResponse.Status.NOT_FOUND; + +/** + * Implementation of RESTClient, encapsulates Atlas' REST APIs. + */ +public class AtlasRestClientImpl extends RetryingClient implements AtlasRestClient { + private static final Logger LOG = LoggerFactory.getLogger(AtlasRestClientImpl.class); + private final AtlasClientV2 clientV2; + + public AtlasRestClientImpl(AtlasClientV2 clientV2) { + this.clientV2 = clientV2; + } + + private <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception { + final ExecutorService executor = Executors.newSingleThreadExecutor(); + final Future<T> future = executor.submit(callable); + executor.shutdown(); + try { + return future.get(timeout, timeUnit); + } catch (TimeoutException e) { + future.cancel(true); + throw e; + } catch (ExecutionException e) { + Throwable t = e.getCause(); + if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof Exception) { + throw (Exception) t; + } else { + throw new IllegalStateException(t); + } + } + } + + public InputStream exportData(AtlasExportRequest request) throws Exception { + LOG.debug("exportData: {}" + request); + return invokeWithRetry(new Callable<InputStream>() { + @Override + public InputStream call() throws Exception { + return clientV2.exportData(request); + } + }, null); + } + + public AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo atlasReplInfo) throws Exception { + AtlasImportResult defaultResult = getDefaultAtlasImportResult(request); + Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME); + FileSystem fs = FileSystem.get(exportFilePath.toUri(), atlasReplInfo.getConf()); + if (!fs.exists(exportFilePath)) { + return defaultResult; + } + LOG.debug("Atlas import data request: {}" + request); + return invokeWithRetry(new Callable<AtlasImportResult>() { + @Override + public AtlasImportResult call() throws Exception { + InputStream is = null; + try { + is = fs.open(exportFilePath); + return clientV2.importData(request, is); + } finally { + if (is != null) { + is.close(); + } + } + } + }, defaultResult); + } + + private AtlasImportResult getDefaultAtlasImportResult(AtlasImportRequest request) { + return new AtlasImportResult(request, "", "", "", 0L); + } + + public AtlasServer getServer(String endpoint) throws SemanticException { + try { + return clientV2.getServer(endpoint); + } catch (AtlasServiceException e) { + int statusCode = e.getStatus() != null ? e.getStatus().getStatusCode() : -1; + if (statusCode != NOT_FOUND.getStatusCode()) { + throw new SemanticException("Exception while getServer ", e.getCause()); + } + LOG.warn("getServer of: {} returned: {}", endpoint, e.getMessage()); + } + return null; + } + + public String getEntityGuid(final String entityType, + final String attributeName, final String qualifiedName) throws SemanticException { + int entityApiTimeOut = 10; + final Map<String, String> attributes = new HashMap<String, String>() { + { + put(attributeName, qualifiedName); + } + }; + + try { + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = runWithTimeout( + new Callable<AtlasEntity.AtlasEntityWithExtInfo>() { + @Override + public AtlasEntity.AtlasEntityWithExtInfo call() throws Exception { + return clientV2.getEntityByAttribute(entityType, attributes); + } + }, entityApiTimeOut, TimeUnit.SECONDS); + + if (entityWithExtInfo == null || entityWithExtInfo.getEntity() == null) { + LOG.warn("Atlas entity cannot be retrieved using: type: {} and {} - {}", + entityType, attributeName, qualifiedName); + return null; + } + return entityWithExtInfo.getEntity().getGuid(); + } catch (AtlasServiceException e) { + int statusCode = e.getStatus() != null ? e.getStatus().getStatusCode() : -1; + if (statusCode != NOT_FOUND.getStatusCode()) { + throw new SemanticException("Exception while getEntityGuid ", e.getCause()); + } + LOG.warn("getEntityGuid: Could not retrieve entity guid for: {}-{}-{}", + entityType, attributeName, qualifiedName, e.getMessage()); + return null; + } catch (Exception e) { + throw new SemanticException(e); + } + } + + public boolean getStatus() throws SemanticException { + try { + return clientV2.isServerReady(); + } catch (AtlasServiceException e) { + throw new SemanticException(e.getCause()); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java new file mode 100644 index 0000000..59dcbf7 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/NoOpAtlasRestClient.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.atlas; + +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.model.impexp.AtlasServer; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.UUID; + +/** + * Dummy implementation of RESTClient, encapsulates Atlas' REST APIs. + * To be used for testing. + */ +public class NoOpAtlasRestClient implements AtlasRestClient { + + public InputStream exportData(AtlasExportRequest request) { + return new ByteArrayInputStream("Dummy".getBytes(Charset.forName("UTF-8"))); + } + + public AtlasImportResult importData(AtlasImportRequest request, AtlasReplInfo atlasReplInfo) { + return new AtlasImportResult(request, "", "", "", 0L); + } + + public AtlasServer getServer(String endpoint) { + return new AtlasServer(); + } + + public String getEntityGuid(final String entityType, + final String attributeName, final String qualifiedName) { + return UUID.randomUUID().toString(); + } + + public boolean getStatus() { + return true; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClient.java new file mode 100644 index 0000000..dbc065a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClient.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl.atlas; + +import com.sun.jersey.api.client.UniformInterfaceException; +import org.apache.atlas.AtlasServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +/** + * Implement retry logic for service calls. + */ +public class RetryingClient { + private static final Logger LOG = LoggerFactory.getLogger(RetryingClient.class); + private static final int PAUSE_DURATION_INCREMENT_IN_MINUTES_DEFAULT = (30 * 1000); + private static final int RETRY_COUNT_DEFAULT = 5; + private static final String ERROR_MESSAGE_NO_ENTITIES = "no entities to create/update"; + private static final String ERROR_MESSAGE_IN_PROGRESS = "import or export is in progress"; + private static final String ATLAS_ERROR_CODE_IMPORT_EMPTY_ZIP = "empty ZIP file"; + private static final int MAX_RETY_COUNT = RETRY_COUNT_DEFAULT; + private static final int PAUSE_DURATION_INCREMENT_IN_MS = PAUSE_DURATION_INCREMENT_IN_MINUTES_DEFAULT; + + protected <T> T invokeWithRetry(Callable<T> func, T defaultReturnValue) throws Exception { + for (int currentRetryCount = 1; currentRetryCount <= MAX_RETY_COUNT; currentRetryCount++) { + try { + LOG.debug("Retrying method: {}", func.getClass().getName(), null); + return func.call(); + } catch (Exception e) { + if (processImportExportLockException(e, currentRetryCount)) { + continue; + } + if (processInvalidParameterException(e)) { + return null; + } + LOG.error(func.getClass().getName(), e); + throw new Exception(e); + } + } + return defaultReturnValue; + } + + private boolean processInvalidParameterException(Exception e) { + if (e instanceof UniformInterfaceException) { + return true; + } + if (!(e instanceof AtlasServiceException)) { + return false; + } + if (e.getMessage() == null) { + return false; + } + return (e.getMessage().contains(ERROR_MESSAGE_NO_ENTITIES) + || e.getMessage().contains(ATLAS_ERROR_CODE_IMPORT_EMPTY_ZIP)); + } + + private boolean processImportExportLockException(Exception e, int currentRetryCount) throws Exception { + if (!(e instanceof AtlasServiceException)) { + return false; + } + String excMessage = e.getMessage() == null ? "" : e.getMessage(); + if (excMessage.contains(ERROR_MESSAGE_IN_PROGRESS)) { + try { + int pauseDuration = PAUSE_DURATION_INCREMENT_IN_MS * currentRetryCount; + LOG.info("Atlas in-progress operation detected. Will pause for: {} ms", pauseDuration); + Thread.sleep(pauseDuration); + } catch (InterruptedException intEx) { + LOG.error("Pause wait interrupted!", intEx); + throw new Exception(intEx); + } + return true; + } + return false; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index 543ceca..c0aadb5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.ddl.DDLWork; import org.apache.hadoop.hive.ql.ddl.table.misc.properties.AlterTableSetPropertiesDesc; @@ -82,6 +83,19 @@ public class ReplUtils { // Root base directory name for ranger. public static final String REPL_RANGER_BASE_DIR = "ranger"; + // Root base directory name for atlas. + public static final String REPL_ATLAS_BASE_DIR = "atlas"; + + // Atlas meta data export file. + public static final String REPL_ATLAS_EXPORT_FILE_NAME = "atlas_export.zip"; + + // Config for hadoop default file system. + public static final String DEFAULT_FS_CONFIG = "fs.defaultFS"; + + // Cluster name separator, used when the cluster name contains data center name as well, e.g. dc$mycluster1. + public static final String CLUSTER_NAME_SEPARATOR = "$"; + + // Name of the directory which stores the list of tables included in the policy in case of table level replication. // One file per database, named after the db name. The directory is not created for db level replication. public static final String REPL_TABLE_LIST_DIR_NAME = "_tables"; @@ -184,6 +198,15 @@ public class ReplUtils { return false; } + public static String getNonEmpty(String configParam, HiveConf hiveConf, String errorMsgFormat) + throws SemanticException { + String val = hiveConf.get(configParam); + if (StringUtils.isEmpty(val)) { + throw new SemanticException(String.format(errorMsgFormat, configParam)); + } + return val; + } + public static boolean isTableMigratingToTransactional(HiveConf conf, org.apache.hadoop.hive.metastore.api.Table tableObj) throws TException, IOException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index 44320a5..154f028 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.parse.repl.dump; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; @@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -57,6 +59,7 @@ import java.util.UUID; public class Utils { private static Logger LOG = LoggerFactory.getLogger(Utils.class); public static final String BOOTSTRAP_DUMP_STATE_KEY_PREFIX = "bootstrap.dump.state."; + private static final int DEF_BUF_SIZE = 8 * 1024; public enum ReplDumpState { IDLE, ACTIVE @@ -97,6 +100,34 @@ public class Utils { } } + public static long writeFile(FileSystem fs, Path exportFilePath, InputStream is) throws SemanticException { + Retry<Long> retriable = new Retry<Long>(IOException.class) { + @Override + public Long execute() throws IOException { + FSDataOutputStream fos = null; + try { + long bytesWritten; + fos = fs.create(exportFilePath); + byte[] buffer = new byte[DEF_BUF_SIZE]; + int bytesRead; + while ((bytesRead = is.read(buffer)) != -1) { + fos.write(buffer, 0, bytesRead); + } + bytesWritten = fos.getPos(); + return bytesWritten; + } finally { + if (fos != null) { + fos.close(); + } + } + }}; + try { + return retriable.run(); + } catch (Exception e) { + throw new SemanticException(e); + } + } + public static void writeOutput(String content, Path outputFile, HiveConf hiveConf) throws SemanticException { Retry<Void> retriable = new Retry<Void>(IOException.class) {