aasha commented on a change in pull request #1573:
URL: https://github.com/apache/hive/pull/1573#discussion_r503824826
##########
File path:
ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
##########
@@ -114,4 +123,55 @@ public void testAtlasRestClientBuilder() throws
SemanticException, IOException {
AtlasRestClient atlasClient = atlasRestCleintBuilder.getClient(conf);
Assert.assertTrue(atlasClient != null);
}
+
+ @Test
+ public void testRetryingClientTimeBased() throws SemanticException,
IOException, AtlasServiceException {
+ AtlasClientV2 atlasClientV2 = mock(AtlasClientV2.class);
+ AtlasExportRequest exportRequest = mock(AtlasExportRequest.class);
+ String exportResponseData = "dumpExportContent";
+ InputStream exportedMetadataIS = new
ByteArrayInputStream(exportResponseData.getBytes(StandardCharsets.UTF_8));
+
when(atlasClientV2.exportData(any(AtlasExportRequest.class))).thenReturn(exportedMetadataIS);
+ when(exportRequest.toString()).thenReturn("dummyExportRequest");
+ when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION,
TimeUnit.SECONDS)).thenReturn(60L);
+ when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY,
TimeUnit.SECONDS)).thenReturn(1L);
+ AtlasRestClient atlasClient = new AtlasRestClientImpl(atlasClientV2, conf);
+ AtlasRestClientImpl atlasRestClientImpl = (AtlasRestClientImpl)atlasClient;
+ InputStream inputStream = atlasRestClientImpl.exportData(exportRequest);
+ ArgumentCaptor<AtlasExportRequest> expReqCaptor =
ArgumentCaptor.forClass(AtlasExportRequest.class);
+ Mockito.verify(atlasClientV2,
Mockito.times(1)).exportData(expReqCaptor.capture());
+ Assert.assertEquals(expReqCaptor.getValue().toString(),
"dummyExportRequest");
+ byte[] exportResponseDataReadBytes = new byte[exportResponseData.length()];
+ inputStream.read(exportResponseDataReadBytes);
+ String exportResponseDataReadString = new
String(exportResponseDataReadBytes, StandardCharsets.UTF_8);
+ Assert.assertEquals(exportResponseData, exportResponseDataReadString);
+ }
+
+ @Test
+ public void testRetryingClientTimeBasedExhausted() throws
AtlasServiceException {
+ AtlasClientV2 atlasClientV2 = mock(AtlasClientV2.class);
+ AtlasExportRequest exportRequest = mock(AtlasExportRequest.class);
+ AtlasServiceException atlasServiceException =
mock(AtlasServiceException.class);
+ when(atlasServiceException.getMessage()).thenReturn("import or export is
in progress");
+
when(atlasClientV2.exportData(any(AtlasExportRequest.class))).thenThrow(atlasServiceException);
+ when(exportRequest.toString()).thenReturn("dummyExportRequest");
+ when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION,
TimeUnit.SECONDS)).thenReturn(60L);
+ when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY,
TimeUnit.SECONDS)).thenReturn(10L);
+
when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES,
TimeUnit.SECONDS)).thenReturn(20L);
+
when(conf.getFloatVar(HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT)).thenReturn(2.0f);
+ AtlasRestClient atlasClient = new AtlasRestClientImpl(atlasClientV2, conf);
+ AtlasRestClientImpl atlasRestClientImpl = (AtlasRestClientImpl)atlasClient;
+ InputStream inputStream = null;
+ try {
+ inputStream = atlasRestClientImpl.exportData(exportRequest);
+ Assert.fail("Should have thrown SemanticException.");
+ } catch (SemanticException ex) {
+ Assert.assertTrue(ex.getMessage().contains("Retry exhausted for
retryable error code"));
Review comment:
The actual exception should also be present
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##########
@@ -194,9 +194,11 @@ long dumpAtlasMetaData(AtlasRequestBuilder
atlasRequestBuilder, AtlasReplInfo at
AtlasExportRequest exportRequest =
atlasRequestBuilder.createExportRequest(atlasReplInfo,
atlasReplInfo.getSrcCluster());
inputStream = atlasRestClient.exportData(exportRequest);
- FileSystem fs =
atlasReplInfo.getStagingDir().getFileSystem(atlasReplInfo.getConf());
- Path exportFilePath = new Path(atlasReplInfo.getStagingDir(),
ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
- numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream, conf);
+ if (inputStream != null) {
+ FileSystem fs =
atlasReplInfo.getStagingDir().getFileSystem(atlasReplInfo.getConf());
+ Path exportFilePath = new Path(atlasReplInfo.getStagingDir(),
ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
+ numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream,
conf);
+ }
Review comment:
An error log for null inputstream would help
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]