[
https://issues.apache.org/jira/browse/HIVE-23353?focusedWorklogId=434395&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-434395
]
ASF GitHub Bot logged work on HIVE-23353:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 18/May/20 13:51
Start Date: 18/May/20 13:51
Worklog Time Spent: 10m
Work Description: aasha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r426596518
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasProcess;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasExportProcess;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Atlas Metadata Replication Task.
+ **/
+public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable
{
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public int execute() {
+ try {
+ AtlasReplInfo atlasReplInfo = work.getAtlasReplInfo();
+ LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging
location:",
+ atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(),
atlasReplInfo.getStagingDir());
+ AtlasProcess atlasProcess = new AtlasExportProcess();
+ String entityGuid =
atlasProcess.checkHiveEntityGuid(atlasReplInfo.getAtlasEndpoint(),
+ atlasReplInfo.getSrcCluster(), atlasReplInfo.getSrcDB(), conf);
+ long currentModifiedTime =
atlasProcess.getCurrentTimestamp(atlasReplInfo, entityGuid);
+ atlasProcess.run(atlasReplInfo);
+ createDumpMetadata(atlasReplInfo, currentModifiedTime);
+ return 0;
+ } catch (Exception e) {
+ LOG.error("Exception during AtlasDumpTask.execute", e);
+ setException(e);
+ return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+ }
+ }
+
+ private void createDumpMetadata(AtlasReplInfo atlasReplInfo, long
lastModifiedTime) throws SemanticException {
+ Path dumpFile = new Path(atlasReplInfo.getStagingDir(),
EximUtil.METADATA_NAME);
+ List<List<String>> listValues = new ArrayList<>();
+ listValues.add(
+ Arrays.asList(
+ atlasReplInfo.getSrcFsUri(),
+ String.valueOf(lastModifiedTime)
+ )
+ );
+ Utils.writeOutput(listValues, dumpFile, conf, true);
+ LOG.debug("Stored metadata for Atlas dump at:", dumpFile.toString());
+ }
+
+ @Override
+ public StageType getType() {
+ return StageType.ATLAS_REPL_DUMP;
+ }
+
+ @Override
+ public String getName() {
+ return "ATLAS_DUMP_TASK";
Review comment:
Name and stage type is same for all the tasks generally
##########
File path:
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1561,4 +1562,94 @@ public void
testFailureUnsupportedAuthorizerReplication() throws Throwable {
assertEquals("Authorizer sentry not supported for replication ",
e.getMessage());
}
}
+
+ //Testing just the configs and no impact on existing replication
+ @Test
+ public void testAtlasReplication() throws Throwable {
+ Map<String, String> confMap = defaultAtlasConfMap();
+ primary.run("use " + primaryDbName)
+ .run("create table acid_table (key int, value int) partitioned by
(load_date date) " +
+ "clustered by(key) into 2 buckets stored as orc
tblproperties ('transactional'='true')")
+ .run("create table table1 (i String)")
+ .run("insert into table1 values (1)")
+ .run("insert into table1 values (2)")
+ .dump(primaryDbName, getAtlasClause(defaultAtlasConfMap()));
+
+ confMap.remove("hive.repl.atlas.replicatedto");
Review comment:
what happens if its set on target cluster also?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas metadata replication work.
+ */
+@Explain(displayName = "Atlas Meta Data Dump Work", explainLevels = {
Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class AtlasDumpWork implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final String srcDB;
+ private final Path stagingDir;
+ private final HiveConf conf;
+ private final boolean bootstrap;
+ private final Path prevAtlasDumpDir;
+
+
+ public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path
prevAtlasDumpDir, HiveConf conf) {
+ this.srcDB = srcDB;
+ this.stagingDir = stagingDir;
+ this.bootstrap = bootstrap;
+ this.prevAtlasDumpDir = prevAtlasDumpDir;
+ this.conf = conf;
+ }
+
+ public AtlasReplInfo getAtlasReplInfo() throws SemanticException{
Review comment:
Can this info be part of the AtlasDumpWork?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas metadata replication work.
+ */
+@Explain(displayName = "Atlas Meta Data Dump Work", explainLevels = {
Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class AtlasDumpWork implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final String srcDB;
+ private final Path stagingDir;
+ private final HiveConf conf;
+ private final boolean bootstrap;
+ private final Path prevAtlasDumpDir;
+
+
+ public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path
prevAtlasDumpDir, HiveConf conf) {
+ this.srcDB = srcDB;
+ this.stagingDir = stagingDir;
+ this.bootstrap = bootstrap;
+ this.prevAtlasDumpDir = prevAtlasDumpDir;
+ this.conf = conf;
+ }
+
+ public AtlasReplInfo getAtlasReplInfo() throws SemanticException{
+ String endpoint =
getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname);
+ String tgtDB =
getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname);
+ String srcCluster =
getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname);
+ String tgtCluster =
getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname);
+ AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, srcDB, tgtDB,
srcCluster, tgtCluster, stagingDir, conf);
+ atlasReplInfo.setSrcFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG));
+ long lastTimeStamp = isBootstrap() ? 0L : lastStoredTimeStamp();
+ atlasReplInfo.setTimeStamp(lastTimeStamp);
+ return atlasReplInfo;
+ }
+
+ private long lastStoredTimeStamp() throws SemanticException {
+ Path prevMetadataPath = new Path(getPrevAtlasDumpDir(),
EximUtil.METADATA_NAME);
+ BufferedReader br = null;
+ try {
+ FileSystem fs = prevMetadataPath.getFileSystem(conf);
+ br = new BufferedReader(new InputStreamReader(fs.open(prevMetadataPath),
Charset.defaultCharset()));
+ String[] lineContents = br.readLine().split("\t", 5);
+ return Long.parseLong(lineContents[1]);
+ } catch (Exception ex) {
+ throw new SemanticException(ex);
+ } finally {
+ if (br != null) {
+ try {
+ br.close();
+ } catch (IOException e) {
+ throw new SemanticException(e);
+ }
+ }
+ }
+ }
+
+ private String getNonEmpty(String config) throws SemanticException {
+ String val = conf.get(config);
+ if (StringUtils.isEmpty(val)) {
+ throw new SemanticException(config + " is mandatory config for Atlas
metadata replication");
+ }
+ return val;
+ }
+
+ public boolean isBootstrap() {
+ return bootstrap;
+ }
+
+ public Path getPrevAtlasDumpDir() {
Review comment:
why is this needed?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas metadata replication work.
+ */
+@Explain(displayName = "Atlas Meta Data Dump Work", explainLevels = {
Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class AtlasDumpWork implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final String srcDB;
+ private final Path stagingDir;
+ private final HiveConf conf;
+ private final boolean bootstrap;
+ private final Path prevAtlasDumpDir;
+
+
+ public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path
prevAtlasDumpDir, HiveConf conf) {
+ this.srcDB = srcDB;
+ this.stagingDir = stagingDir;
+ this.bootstrap = bootstrap;
+ this.prevAtlasDumpDir = prevAtlasDumpDir;
+ this.conf = conf;
+ }
+
+ public AtlasReplInfo getAtlasReplInfo() throws SemanticException{
+ String endpoint =
getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname);
+ String tgtDB =
getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname);
+ String srcCluster =
getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname);
+ String tgtCluster =
getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname);
+ AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, srcDB, tgtDB,
srcCluster, tgtCluster, stagingDir, conf);
+ atlasReplInfo.setSrcFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG));
+ long lastTimeStamp = isBootstrap() ? 0L : lastStoredTimeStamp();
+ atlasReplInfo.setTimeStamp(lastTimeStamp);
+ return atlasReplInfo;
+ }
+
+ private long lastStoredTimeStamp() throws SemanticException {
+ Path prevMetadataPath = new Path(getPrevAtlasDumpDir(),
EximUtil.METADATA_NAME);
+ BufferedReader br = null;
+ try {
+ FileSystem fs = prevMetadataPath.getFileSystem(conf);
+ br = new BufferedReader(new InputStreamReader(fs.open(prevMetadataPath),
Charset.defaultCharset()));
+ String[] lineContents = br.readLine().split("\t", 5);
+ return Long.parseLong(lineContents[1]);
+ } catch (Exception ex) {
+ throw new SemanticException(ex);
+ } finally {
+ if (br != null) {
+ try {
+ br.close();
+ } catch (IOException e) {
+ throw new SemanticException(e);
+ }
+ }
+ }
+ }
+
+ private String getNonEmpty(String config) throws SemanticException {
+ String val = conf.get(config);
+ if (StringUtils.isEmpty(val)) {
+ throw new SemanticException(config + " is mandatory config for Atlas
metadata replication");
+ }
+ return val;
+ }
+
+ public boolean isBootstrap() {
Review comment:
why is this different for bootstrap and incremental
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas metadata replication load work.
+ */
+@Explain(displayName = "Atlas Meta Data Load Work", explainLevels =
{Level.USER, Level.DEFAULT, Level.EXTENDED})
+public class AtlasLoadWork implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final String srcDB;
+ private final String tgtDB;
+ private final Path stagingDir;
+ private final HiveConf conf;
+
+ public AtlasLoadWork(String srcDB, String tgtDB, Path stagingDir, HiveConf
conf) {
+ this.srcDB = srcDB;
+ this.tgtDB = tgtDB;
+ this.stagingDir = stagingDir;
+ this.conf = conf;
+ }
+
+ public AtlasReplInfo getAtlasReplInfo() throws SemanticException {
+ String endpoint =
getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname);
+ String srcCluster =
getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname);
+ String tgtCluster =
getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname);
+ AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, srcDB, tgtDB,
srcCluster, tgtCluster, stagingDir, conf);
+ atlasReplInfo.setSrcFsUri(getStoredFsUri(atlasReplInfo.getStagingDir()));
+ atlasReplInfo.setTgtFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG));
+ return atlasReplInfo;
+ }
+
+ private String getStoredFsUri(Path atlasDumpDir) throws SemanticException {
+ Path metadataPath = new Path(atlasDumpDir, EximUtil.METADATA_NAME);
+ BufferedReader br = null;
+ try {
+ FileSystem fs = metadataPath.getFileSystem(conf);
+ br = new BufferedReader(new InputStreamReader(fs.open(metadataPath),
Charset.defaultCharset()));
+ String[] lineContents = br.readLine().split("\t", 5);
+ return lineContents[0];
+ } catch (Exception ex) {
+ throw new SemanticException(ex);
+ } finally {
+ if (br != null) {
+ try {
+ br.close();
+ } catch (IOException e) {
+ throw new SemanticException(e);
+ }
+ }
+ }
+ }
+
+ private String getNonEmpty(String config) throws SemanticException {
Review comment:
can be at a common place
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas metadata replication load work.
+ */
+@Explain(displayName = "Atlas Meta Data Load Work", explainLevels =
{Level.USER, Level.DEFAULT, Level.EXTENDED})
+public class AtlasLoadWork implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final String srcDB;
+ private final String tgtDB;
+ private final Path stagingDir;
+ private final HiveConf conf;
+
+ public AtlasLoadWork(String srcDB, String tgtDB, Path stagingDir, HiveConf
conf) {
+ this.srcDB = srcDB;
+ this.tgtDB = tgtDB;
+ this.stagingDir = stagingDir;
+ this.conf = conf;
+ }
+
+ public AtlasReplInfo getAtlasReplInfo() throws SemanticException {
+ String endpoint =
getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname);
+ String srcCluster =
getNonEmpty(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname);
+ String tgtCluster =
getNonEmpty(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname);
+ AtlasReplInfo atlasReplInfo = new AtlasReplInfo(endpoint, srcDB, tgtDB,
srcCluster, tgtCluster, stagingDir, conf);
+ atlasReplInfo.setSrcFsUri(getStoredFsUri(atlasReplInfo.getStagingDir()));
+ atlasReplInfo.setTgtFsUri(conf.get(ReplUtils.DEFAULT_FS_CONFIG));
+ return atlasReplInfo;
+ }
+
+ private String getStoredFsUri(Path atlasDumpDir) throws SemanticException {
Review comment:
common method
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRESTClient.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static com.sun.jersey.api.client.ClientResponse.Status.NOT_FOUND;
+
+/**
+ * Atlas RESTClient interface for Atlas' REST APIs.
+ */
+public interface AtlasRESTClient {
Review comment:
Class naming convention
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/DummyAtlasRESTClient.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.UUID;
+
+/**
+ * Dummy implementation of RESTClient, encapsulates Atlas' REST APIs.
+ * To be used for testing.
+ */
+public class DummyAtlasRESTClient implements AtlasRESTClient {
Review comment:
Can be named as NoOp
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasProcess.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Base class for Atlas Processes, viz. Export & Import
+ */
+public abstract class AtlasProcess {
+ private static final String CLUSTER_NAME_SEPARATOR = "$";
Review comment:
Can go to utils
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasImportProcess.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Performs Atlas' Import.
+ */
+public class AtlasImportProcess extends AtlasProcess {
+ protected static final Logger LOG =
LoggerFactory.getLogger(AtlasImportProcess.class);
Review comment:
can be part of AtlasLoadTask
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -107,6 +107,9 @@ public int execute() {
}
work.setRootTask(this);
this.parentTasks = null;
+ if (shouldLoadAtlasMetadata()) {
Review comment:
should this be after ranger?
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasProcess;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasExportProcess;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Atlas Metadata Replication Task.
+ **/
+public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable
{
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public int execute() {
+ try {
+ AtlasReplInfo atlasReplInfo = work.getAtlasReplInfo();
+ LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging
location:",
+ atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(),
atlasReplInfo.getStagingDir());
+ AtlasProcess atlasProcess = new AtlasExportProcess();
+ String entityGuid =
atlasProcess.checkHiveEntityGuid(atlasReplInfo.getAtlasEndpoint(),
+ atlasReplInfo.getSrcCluster(), atlasReplInfo.getSrcDB(), conf);
+ long currentModifiedTime =
atlasProcess.getCurrentTimestamp(atlasReplInfo, entityGuid);
+ atlasProcess.run(atlasReplInfo);
+ createDumpMetadata(atlasReplInfo, currentModifiedTime);
+ return 0;
+ } catch (Exception e) {
+ LOG.error("Exception during AtlasDumpTask.execute", e);
+ setException(e);
+ return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+ }
+ }
+
+ private void createDumpMetadata(AtlasReplInfo atlasReplInfo, long
lastModifiedTime) throws SemanticException {
Review comment:
test for this
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasExportProcess.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Performs Atlas metadata export.
+ */
+public class AtlasExportProcess extends AtlasProcess {
+ private FileSystem fileSystem = null;
+ protected static final Logger LOG =
LoggerFactory.getLogger(AtlasExportProcess.class);
+ private static final int DEF_BUF_SIZE = 8 * 1024;
+
+ public void run(AtlasReplInfo atlasReplInfo) throws SemanticException {
Review comment:
This can be as part of AtlasDumpTask. Don't need a separate process for
this
##########
File path:
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRESTClientImpl.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static com.sun.jersey.api.client.ClientResponse.Status.NOT_FOUND;
+
+/**
+ * Implementation of RESTClient, encapsulates Atlas' REST APIs.
+ */
+public class AtlasRESTClientImpl extends RetryingClient implements
AtlasRESTClient{
Review comment:
class naming convention
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -149,6 +149,10 @@ public int execute() {
if (shouldDumpAuthorizationMetadata()) {
initiateAuthorizationDumpTask();
}
+ if (shouldDumpAtlasMetadata()) {
Review comment:
should this be before ranger
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 434395)
Time Spent: 20m (was: 10m)
> Atlas metadata replication scheduling
> -------------------------------------
>
> Key: HIVE-23353
> URL: https://issues.apache.org/jira/browse/HIVE-23353
> Project: Hive
> Issue Type: Task
> Reporter: PRAVIN KUMAR SINHA
> Assignee: PRAVIN KUMAR SINHA
> Priority: Major
> Labels: pull-request-available
> Attachments: HIVE-23353.01.patch
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)