This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 70507fa  HIVE-24267: RetryingClientTimeBased should always perform 
first invocation (Pravin Kumar Sinha, reviewed by Aasha Medhi)
70507fa is described below

commit 70507fa167663b65949f1ef17576a8755f093f85
Author: Anishek Agarwal <[email protected]>
AuthorDate: Mon Oct 19 14:16:07 2020 +0530

    HIVE-24267: RetryingClientTimeBased should always perform first invocation 
(Pravin Kumar Sinha, reviewed by Aasha Medhi)
---
 .../hadoop/hive/ql/exec/repl/AtlasDumpTask.java    | 10 ++--
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |  3 +-
 .../ql/exec/repl/atlas/AtlasRestClientImpl.java    |  5 +-
 .../exec/repl/atlas/RetryingClientTimeBased.java   | 13 +++--
 .../ql/exec/repl/ranger/RangerRestClientImpl.java  | 10 ++--
 .../hive/ql/exec/repl/TestAtlasDumpTask.java       | 61 ++++++++++++++++++++++
 6 files changed, 88 insertions(+), 14 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
index 45d67c6..2f6b918 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
@@ -194,9 +194,13 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> 
implements Serializable {
       AtlasExportRequest exportRequest = 
atlasRequestBuilder.createExportRequest(atlasReplInfo,
               atlasReplInfo.getSrcCluster());
       inputStream = atlasRestClient.exportData(exportRequest);
-      FileSystem fs = 
atlasReplInfo.getStagingDir().getFileSystem(atlasReplInfo.getConf());
-      Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), 
ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
-      numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream, conf);
+      if (inputStream == null) {
+        LOG.info("There is no Atlas metadata to be exported");
+      } else {
+        FileSystem fs = 
atlasReplInfo.getStagingDir().getFileSystem(atlasReplInfo.getConf());
+        Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), 
ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
+        numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream, 
conf);
+      }
     } catch (SemanticException ex) {
       throw ex;
     } catch (Exception ex) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 61b3652..3c6fdaa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -180,7 +180,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
   private void addAtlasLoadTask() throws HiveException {
     Path atlasDumpDir = new Path(new Path(work.dumpDirectory).getParent(), 
ReplUtils.REPL_ATLAS_BASE_DIR);
     LOG.info("Adding task to load Atlas metadata from {} ", atlasDumpDir);
-    AtlasLoadWork atlasLoadWork = new AtlasLoadWork(work.getSourceDbName(), 
work.dbNameToLoadIn, atlasDumpDir,
+    String targetDbName = StringUtils.isEmpty(work.dbNameToLoadIn) ? 
work.getSourceDbName() : work.dbNameToLoadIn;
+    AtlasLoadWork atlasLoadWork = new AtlasLoadWork(work.getSourceDbName(), 
targetDbName, atlasDumpDir,
         work.getMetricCollector());
     Task<?> atlasLoadTask = TaskFactory.get(atlasLoadWork, conf);
     if (childTasks == null) {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
index e4b294d..ed7485d 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
@@ -95,7 +95,7 @@ public class AtlasRestClientImpl extends 
RetryingClientTimeBased implements Atla
         SecurityUtils.reloginExpiringKeytabUser();
         return clientV2.exportData(request);
       }
-    }, null);
+    });
   }
 
   public AtlasImportResult importData(AtlasImportRequest request, 
AtlasReplInfo atlasReplInfo) throws Exception {
@@ -103,6 +103,7 @@ public class AtlasRestClientImpl extends 
RetryingClientTimeBased implements Atla
     Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), 
ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
     FileSystem fs = FileSystem.get(exportFilePath.toUri(), 
atlasReplInfo.getConf());
     if (!fs.exists(exportFilePath)) {
+      LOG.info("There is nothing to load, returning the default result.");
       return defaultResult;
     }
     LOG.debug("Atlas import data request: {}" + request);
@@ -120,7 +121,7 @@ public class AtlasRestClientImpl extends 
RetryingClientTimeBased implements Atla
           }
         }
       }
-    }, defaultResult);
+    });
   }
 
   private AtlasImportResult getDefaultAtlasImportResult(AtlasImportRequest 
request) {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java
index 6ddb114..c84567e 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java
@@ -43,10 +43,10 @@ public class RetryingClientTimeBased {
   protected double backOff;
   protected int maxJitterInSeconds;
 
-  protected <T> T invokeWithRetry(Callable<T> func, T defaultReturnValue) 
throws SemanticException {
+  protected <T> T invokeWithRetry(Callable<T> func) throws SemanticException {
     long startTime = System.currentTimeMillis();
     long delay = this.initialDelayInSeconds;
-    while (elapsedTimeInSeconds(startTime) + delay > 
this.totalDurationInSeconds) {
+    while (true) {
       try {
         LOG.debug("Retrying method: {}", func.getClass().getName(), null);
         return func.call();
@@ -54,16 +54,19 @@ public class RetryingClientTimeBased {
         if (processImportExportLockException(e, delay)) {
           //retry case. compute next sleep time
           delay = getNextDelay(delay);
+          if (elapsedTimeInSeconds(startTime) + delay > 
this.totalDurationInSeconds) {
+            throw new 
SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(), e);
+          }
           continue;
         }
         if (processInvalidParameterException(e)) {
+          LOG.info("There is nothing to export/import.");
           return null;
         }
         LOG.error(func.getClass().getName(), e);
         throw new SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(), e);
       }
     }
-    return defaultReturnValue;
   }
 
   private long getNextDelay(long currentDelay) {
@@ -109,8 +112,8 @@ public class RetryingClientTimeBased {
     String excMessage = e.getMessage() == null ? "" : e.getMessage();
     if (excMessage.contains(ERROR_MESSAGE_IN_PROGRESS)) {
       try {
-        LOG.info("Atlas in-progress operation detected. Will pause for: {} 
ms", delay);
-        Thread.sleep(delay);
+        LOG.info("Atlas in-progress operation detected. Will pause for: {} 
seconds", delay);
+        Thread.sleep(delay * 1000L);
       } catch (InterruptedException intEx) {
         LOG.error("Pause wait interrupted!", intEx);
         throw new SemanticException(intEx);
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
index 31081ab..5b2fe4e 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java
@@ -202,9 +202,13 @@ public class RangerRestClientImpl implements 
RangerRestClient {
     Retryable retryable = Retryable.builder()
       .withHiveConf(hiveConf)
       .withRetryOnException(Exception.class).build();
-    return retryable.executeCallable(() -> 
importRangerPoliciesPlain(jsonRangerExportPolicyList,
-      rangerPoliciesJsonFileName,
-      serviceMapJsonFileName, jsonServiceMap, finalUrl, 
rangerExportPolicyList));
+    try {
+      return retryable.executeCallable(() -> 
importRangerPoliciesPlain(jsonRangerExportPolicyList,
+              rangerPoliciesJsonFileName,
+              serviceMapJsonFileName, jsonServiceMap, finalUrl, 
rangerExportPolicyList));
+    } catch (Exception e) {
+      throw new 
SemanticException(ErrorMsg.REPL_RETRY_EXHAUSTED.format(e.getMessage()), e);
+    }
   }
 
   private RangerExportPolicyList importRangerPoliciesPlain(String 
jsonRangerExportPolicyList,
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
index fbc6dfc..935dc99 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
@@ -18,16 +18,21 @@
 
 package org.apache.hadoop.hive.ql.exec.repl;
 
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
 import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
 import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient;
 import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientImpl;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.ReplState;
 import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Assert;
 import org.junit.Test;
@@ -43,7 +48,11 @@ import org.slf4j.LoggerFactory;
 
 import static org.mockito.ArgumentMatchers.any;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
@@ -114,4 +123,56 @@ public class TestAtlasDumpTask {
     AtlasRestClient atlasClient = atlasRestCleintBuilder.getClient(conf);
     Assert.assertTrue(atlasClient != null);
   }
+
+  @Test
+  public void testRetryingClientTimeBased() throws SemanticException, 
IOException, AtlasServiceException {
+    AtlasClientV2 atlasClientV2 = mock(AtlasClientV2.class);
+    AtlasExportRequest exportRequest = mock(AtlasExportRequest.class);
+    String exportResponseData = "dumpExportContent";
+    InputStream exportedMetadataIS = new 
ByteArrayInputStream(exportResponseData.getBytes(StandardCharsets.UTF_8));
+    
when(atlasClientV2.exportData(any(AtlasExportRequest.class))).thenReturn(exportedMetadataIS);
+    when(exportRequest.toString()).thenReturn("dummyExportRequest");
+    when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, 
TimeUnit.SECONDS)).thenReturn(60L);
+    when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, 
TimeUnit.SECONDS)).thenReturn(1L);
+    AtlasRestClient atlasClient = new AtlasRestClientImpl(atlasClientV2, conf);
+    AtlasRestClientImpl atlasRestClientImpl = (AtlasRestClientImpl)atlasClient;
+    InputStream inputStream = atlasRestClientImpl.exportData(exportRequest);
+    ArgumentCaptor<AtlasExportRequest> expReqCaptor = 
ArgumentCaptor.forClass(AtlasExportRequest.class);
+    Mockito.verify(atlasClientV2, 
Mockito.times(1)).exportData(expReqCaptor.capture());
+    Assert.assertEquals(expReqCaptor.getValue().toString(), 
"dummyExportRequest");
+    byte[] exportResponseDataReadBytes = new byte[exportResponseData.length()];
+    inputStream.read(exportResponseDataReadBytes);
+    String exportResponseDataReadString = new 
String(exportResponseDataReadBytes, StandardCharsets.UTF_8);
+    Assert.assertEquals(exportResponseData, exportResponseDataReadString);
+  }
+
+  @Test
+  public void testRetryingClientTimeBasedExhausted() throws 
AtlasServiceException {
+    AtlasClientV2 atlasClientV2 = mock(AtlasClientV2.class);
+    AtlasExportRequest exportRequest = mock(AtlasExportRequest.class);
+    AtlasServiceException atlasServiceException = 
mock(AtlasServiceException.class);
+    when(atlasServiceException.getMessage()).thenReturn("import or export is 
in progress");
+    
when(atlasClientV2.exportData(any(AtlasExportRequest.class))).thenThrow(atlasServiceException);
+    when(exportRequest.toString()).thenReturn("dummyExportRequest");
+    when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, 
TimeUnit.SECONDS)).thenReturn(60L);
+    when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, 
TimeUnit.SECONDS)).thenReturn(10L);
+    
when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES, 
TimeUnit.SECONDS)).thenReturn(20L);
+    
when(conf.getFloatVar(HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT)).thenReturn(2.0f);
+    AtlasRestClient atlasClient = new AtlasRestClientImpl(atlasClientV2, conf);
+    AtlasRestClientImpl atlasRestClientImpl = (AtlasRestClientImpl)atlasClient;
+    InputStream inputStream = null;
+    try {
+      inputStream = atlasRestClientImpl.exportData(exportRequest);
+      Assert.fail("Should have thrown SemanticException.");
+    } catch (SemanticException ex) {
+      Assert.assertTrue(ex.getMessage().contains("Retry exhausted for 
retryable error code"));
+      Assert.assertTrue(atlasServiceException == ex.getCause());
+    }
+    ArgumentCaptor<AtlasExportRequest> expReqCaptor = 
ArgumentCaptor.forClass(AtlasExportRequest.class);
+    Mockito.verify(atlasClientV2, 
Mockito.times(3)).exportData(expReqCaptor.capture());
+    for (AtlasExportRequest atlasExportRequest: expReqCaptor.getAllValues()) {
+      Assert.assertEquals(atlasExportRequest.toString(), "dummyExportRequest");
+    }
+    Assert.assertTrue(inputStream == null);
+  }
 }

Reply via email to