This is an automated email from the ASF dual-hosted git repository.
anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 70507fa HIVE-24267: RetryingClientTimeBased should always perform
first invocation (Pravin Kumar Sinha, reviewed by Aasha Medhi)
70507fa is described below
commit 70507fa167663b65949f1ef17576a8755f093f85
Author: Anishek Agarwal <[email protected]>
AuthorDate: Mon Oct 19 14:16:07 2020 +0530
HIVE-24267: RetryingClientTimeBased should always perform first invocation
(Pravin Kumar Sinha, reviewed by Aasha Medhi)
---
.../hadoop/hive/ql/exec/repl/AtlasDumpTask.java | 10 ++--
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 3 +-
.../ql/exec/repl/atlas/AtlasRestClientImpl.java | 5 +-
.../exec/repl/atlas/RetryingClientTimeBased.java | 13 +++--
.../ql/exec/repl/ranger/RangerRestClientImpl.java | 10 ++--
.../hive/ql/exec/repl/TestAtlasDumpTask.java | 61 ++++++++++++++++++++++
6 files changed, 88 insertions(+), 14 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
index 45d67c6..2f6b918 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
@@ -194,9 +194,13 @@ public class AtlasDumpTask extends Task<AtlasDumpWork>
implements Serializable {
AtlasExportRequest exportRequest =
atlasRequestBuilder.createExportRequest(atlasReplInfo,
atlasReplInfo.getSrcCluster());
inputStream = atlasRestClient.exportData(exportRequest);
- FileSystem fs =
atlasReplInfo.getStagingDir().getFileSystem(atlasReplInfo.getConf());
- Path exportFilePath = new Path(atlasReplInfo.getStagingDir(),
ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
- numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream, conf);
+ if (inputStream == null) {
+ LOG.info("There is no Atlas metadata to be exported");
+ } else {
+ FileSystem fs =
atlasReplInfo.getStagingDir().getFileSystem(atlasReplInfo.getConf());
+ Path exportFilePath = new Path(atlasReplInfo.getStagingDir(),
ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
+ numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream,
conf);
+ }
} catch (SemanticException ex) {
throw ex;
} catch (Exception ex) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 61b3652..3c6fdaa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -180,7 +180,8 @@ public class ReplLoadTask extends Task<ReplLoadWork>
implements Serializable {
private void addAtlasLoadTask() throws HiveException {
Path atlasDumpDir = new Path(new Path(work.dumpDirectory).getParent(),
ReplUtils.REPL_ATLAS_BASE_DIR);
LOG.info("Adding task to load Atlas metadata from {} ", atlasDumpDir);
- AtlasLoadWork atlasLoadWork = new AtlasLoadWork(work.getSourceDbName(),
work.dbNameToLoadIn, atlasDumpDir,
+ String targetDbName = StringUtils.isEmpty(work.dbNameToLoadIn) ?
work.getSourceDbName() : work.dbNameToLoadIn;
+ AtlasLoadWork atlasLoadWork = new AtlasLoadWork(work.getSourceDbName(),
targetDbName, atlasDumpDir,
work.getMetricCollector());
Task<?> atlasLoadTask = TaskFactory.get(atlasLoadWork, conf);
if (childTasks == null) {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
index e4b294d..ed7485d 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
@@ -95,7 +95,7 @@ public class AtlasRestClientImpl extends
RetryingClientTimeBased implements Atla
SecurityUtils.reloginExpiringKeytabUser();
return clientV2.exportData(request);
}
- }, null);
+ });
}
public AtlasImportResult importData(AtlasImportRequest request,
AtlasReplInfo atlasReplInfo) throws Exception {
@@ -103,6 +103,7 @@ public class AtlasRestClientImpl extends
RetryingClientTimeBased implements Atla
Path exportFilePath = new Path(atlasReplInfo.getStagingDir(),
ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
FileSystem fs = FileSystem.get(exportFilePath.toUri(),
atlasReplInfo.getConf());
if (!fs.exists(exportFilePath)) {
+ LOG.info("There is nothing to load, returning the default result.");
return defaultResult;
}
LOG.debug("Atlas import data request: {}" + request);
@@ -120,7 +121,7 @@ public class AtlasRestClientImpl extends
RetryingClientTimeBased implements Atla
}
}
}
- }, defaultResult);
+ });
}
private AtlasImportResult getDefaultAtlasImportResult(AtlasImportRequest
request) {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java
index 6ddb114..c84567e 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java
@@ -43,10 +43,10 @@ public class RetryingClientTimeBased {
protected double backOff;
protected int maxJitterInSeconds;
- protected <T> T invokeWithRetry(Callable<T> func, T defaultReturnValue)
throws SemanticException {
+ protected <T> T invokeWithRetry(Callable<T> func) throws SemanticException {
long startTime = System.currentTimeMillis();
long delay = this.initialDelayInSeconds;
- while (elapsedTimeInSeconds(startTime) + delay >
this.totalDurationInSeconds) {
+ while (true) {
try {
LOG.debug("Retrying method: {}", func.getClass().getName(), null);
return func.call();
@@ -54,16 +54,19 @@ public class RetryingClientTimeBased {
if (processImportExportLockException(e, delay)) {
//retry case. compute next sleep time
delay = getNextDelay(delay);
+ if (elapsedTimeInSeconds(startTime) + delay >
this.totalDurationInSeconds) {
+ throw new
SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(), e);
+ }
continue;
}
if (processInvalidParameterException(e)) {
+ LOG.info("There is nothing to export/import.");
return null;
}
LOG.error(func.getClass().getName(), e);
throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(), e);
}
}
- return defaultReturnValue;
}
private long getNextDelay(long currentDelay) {
@@ -109,8 +112,8 @@ public class RetryingClientTimeBased {
String excMessage = e.getMessage() == null ? "" : e.getMessage();
if (excMessage.contains(ERROR_MESSAGE_IN_PROGRESS)) {
try {
- LOG.info("Atlas in-progress operation detected. Will pause for: {}
ms", delay);
- Thread.sleep(delay);
+ LOG.info("Atlas in-progress operation detected. Will pause for: {}
seconds", delay);
+ Thread.sleep(delay * 1000L);
} catch (InterruptedException intEx) {
LOG.error("Pause wait interrupted!", intEx);
throw new SemanticException(intEx);
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
index 31081ab..5b2fe4e 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
@@ -202,9 +202,13 @@ public class RangerRestClientImpl implements
RangerRestClient {
Retryable retryable = Retryable.builder()
.withHiveConf(hiveConf)
.withRetryOnException(Exception.class).build();
- return retryable.executeCallable(() ->
importRangerPoliciesPlain(jsonRangerExportPolicyList,
- rangerPoliciesJsonFileName,
- serviceMapJsonFileName, jsonServiceMap, finalUrl,
rangerExportPolicyList));
+ try {
+ return retryable.executeCallable(() ->
importRangerPoliciesPlain(jsonRangerExportPolicyList,
+ rangerPoliciesJsonFileName,
+ serviceMapJsonFileName, jsonServiceMap, finalUrl,
rangerExportPolicyList));
+ } catch (Exception e) {
+ throw new
SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
+ }
}
private RangerExportPolicyList importRangerPoliciesPlain(String
jsonRangerExportPolicyList,
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
index fbc6dfc..935dc99 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
@@ -18,16 +18,21 @@
package org.apache.hadoop.hive.ql.exec.repl;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient;
import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientImpl;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.ReplState;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Assert;
import org.junit.Test;
@@ -43,7 +48,11 @@ import org.slf4j.LoggerFactory;
import static org.mockito.ArgumentMatchers.any;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.mock;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
@@ -114,4 +123,56 @@ public class TestAtlasDumpTask {
AtlasRestClient atlasClient = atlasRestCleintBuilder.getClient(conf);
Assert.assertTrue(atlasClient != null);
}
+
+ @Test
+ public void testRetryingClientTimeBased() throws SemanticException,
IOException, AtlasServiceException {
+ AtlasClientV2 atlasClientV2 = mock(AtlasClientV2.class);
+ AtlasExportRequest exportRequest = mock(AtlasExportRequest.class);
+ String exportResponseData = "dumpExportContent";
+ InputStream exportedMetadataIS = new
ByteArrayInputStream(exportResponseData.getBytes(StandardCharsets.UTF_8));
+
when(atlasClientV2.exportData(any(AtlasExportRequest.class))).thenReturn(exportedMetadataIS);
+ when(exportRequest.toString()).thenReturn("dummyExportRequest");
+ when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION,
TimeUnit.SECONDS)).thenReturn(60L);
+ when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY,
TimeUnit.SECONDS)).thenReturn(1L);
+ AtlasRestClient atlasClient = new AtlasRestClientImpl(atlasClientV2, conf);
+ AtlasRestClientImpl atlasRestClientImpl = (AtlasRestClientImpl)atlasClient;
+ InputStream inputStream = atlasRestClientImpl.exportData(exportRequest);
+ ArgumentCaptor<AtlasExportRequest> expReqCaptor =
ArgumentCaptor.forClass(AtlasExportRequest.class);
+ Mockito.verify(atlasClientV2,
Mockito.times(1)).exportData(expReqCaptor.capture());
+ Assert.assertEquals(expReqCaptor.getValue().toString(),
"dummyExportRequest");
+ byte[] exportResponseDataReadBytes = new byte[exportResponseData.length()];
+ inputStream.read(exportResponseDataReadBytes);
+ String exportResponseDataReadString = new
String(exportResponseDataReadBytes, StandardCharsets.UTF_8);
+ Assert.assertEquals(exportResponseData, exportResponseDataReadString);
+ }
+
+ @Test
+ public void testRetryingClientTimeBasedExhausted() throws
AtlasServiceException {
+ AtlasClientV2 atlasClientV2 = mock(AtlasClientV2.class);
+ AtlasExportRequest exportRequest = mock(AtlasExportRequest.class);
+ AtlasServiceException atlasServiceException =
mock(AtlasServiceException.class);
+ when(atlasServiceException.getMessage()).thenReturn("import or export is
in progress");
+
when(atlasClientV2.exportData(any(AtlasExportRequest.class))).thenThrow(atlasServiceException);
+ when(exportRequest.toString()).thenReturn("dummyExportRequest");
+ when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION,
TimeUnit.SECONDS)).thenReturn(60L);
+ when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY,
TimeUnit.SECONDS)).thenReturn(10L);
+
when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES,
TimeUnit.SECONDS)).thenReturn(20L);
+
when(conf.getFloatVar(HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT)).thenReturn(2.0f);
+ AtlasRestClient atlasClient = new AtlasRestClientImpl(atlasClientV2, conf);
+ AtlasRestClientImpl atlasRestClientImpl = (AtlasRestClientImpl)atlasClient;
+ InputStream inputStream = null;
+ try {
+ inputStream = atlasRestClientImpl.exportData(exportRequest);
+ Assert.fail("Should have thrown SemanticException.");
+ } catch (SemanticException ex) {
+ Assert.assertTrue(ex.getMessage().contains("Retry exhausted for
retryable error code"));
+ Assert.assertTrue(atlasServiceException == ex.getCause());
+ }
+ ArgumentCaptor<AtlasExportRequest> expReqCaptor =
ArgumentCaptor.forClass(AtlasExportRequest.class);
+ Mockito.verify(atlasClientV2,
Mockito.times(3)).exportData(expReqCaptor.capture());
+ for (AtlasExportRequest atlasExportRequest: expReqCaptor.getAllValues()) {
+ Assert.assertEquals(atlasExportRequest.toString(), "dummyExportRequest");
+ }
+ Assert.assertTrue(inputStream == null);
+ }
}