This is an automated email from the ASF dual-hosted git repository.
asnaik pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new c763113 [AMBARI-25362] Hive View throws TimeoutException deadline
passed for few queries randomly.( Sreenath Somarajapuram) (#3072)
c763113 is described below
commit c763113e134d238f3f159dabd8de359cb40bff13
Author: Asnaik HWX <[email protected]>
AuthorDate: Tue Oct 1 12:12:22 2019 +0530
[AMBARI-25362] Hive View throws TimeoutException deadline passed for few
queries randomly.( Sreenath Somarajapuram) (#3072)
---
.../org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java | 9 +++++----
.../org/apache/ambari/view/hive2/client/NonPersistentCursor.java | 5 ++---
.../org/apache/ambari/view/hive20/client/AsyncJobRunnerImpl.java | 9 +++++----
.../apache/ambari/view/hive20/client/NonPersistentCursor.java | 5 ++---
4 files changed, 14 insertions(+), 14 deletions(-)
diff --git
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java
index 82e3df5..e19415e 100644
---
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java
+++
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java
@@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit;
public class AsyncJobRunnerImpl implements AsyncJobRunner {
private final Logger LOG = LoggerFactory.getLogger(getClass());
+ private static scala.concurrent.duration.FiniteDuration WAIT_TIME =
Duration.create(12, TimeUnit.HOURS);
private final ActorRef controller;
private final ActorSystem system;
@@ -76,7 +77,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner {
public Optional<NonPersistentCursor> getCursor(String jobId, String
username) {
Inbox inbox = Inbox.create(system);
inbox.send(controller, new FetchResult(jobId, username));
- Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES));
+ Object receive = inbox.receive(WAIT_TIME);
if(receive instanceof ResultNotReady) {
String errorString = "Result not ready for job: " + jobId + ", username:
" + username + ". Try after sometime.";
LOG.info(errorString);
@@ -98,7 +99,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner {
public Optional<NonPersistentCursor> resetAndGetCursor(String jobId, String
username) {
Inbox inbox = Inbox.create(system);
inbox.send(controller, new FetchResult(jobId, username));
- Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES));
+ Object receive = inbox.receive(WAIT_TIME);
if(receive instanceof ResultNotReady) {
String errorString = "Result not ready for job: " + jobId + ", username:
" + username + ". Try after sometime.";
LOG.info(errorString);
@@ -110,7 +111,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner {
Optional<ActorRef> iterator = (Optional<ActorRef>) receive;
if(iterator.isPresent()) {
inbox.send(iterator.get(), new ResetCursor());
- Object resetResult = inbox.receive(Duration.create(1,
TimeUnit.MINUTES));
+ Object resetResult = inbox.receive(WAIT_TIME);
if (resetResult instanceof CursorReset) {
return Optional.of(new NonPersistentCursor(context, system,
iterator.get()));
} else {
@@ -126,7 +127,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner {
public Optional<Failure> getError(String jobId, String username) {
Inbox inbox = Inbox.create(system);
inbox.send(controller, new FetchError(jobId, username));
- Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES));
+ Object receive = inbox.receive(WAIT_TIME);
if(receive instanceof FetchFailed){
FetchFailed fetchFailed = (FetchFailed) receive;
return Optional.of(new Failure(fetchFailed.getMessage(),
getExceptionForRetry()));
diff --git
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/NonPersistentCursor.java
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/NonPersistentCursor.java
index 13cab33..1e43c07 100644
---
a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/NonPersistentCursor.java
+++
b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/NonPersistentCursor.java
@@ -45,7 +45,7 @@ import java.util.concurrent.TimeUnit;
*/
public class NonPersistentCursor implements Cursor<Row, ColumnDescription> {
private final Logger LOG = LoggerFactory.getLogger(getClass());
- private static long DEFAULT_WAIT_TIMEOUT = 60 * 1000L;
+ private static scala.concurrent.duration.FiniteDuration WAIT_TIME =
Duration.create(12, TimeUnit.HOURS);
private final ActorSystem system;
private final ActorRef actorRef;
@@ -124,8 +124,7 @@ public class NonPersistentCursor implements Cursor<Row,
ColumnDescription> {
inbox.send(actorRef, new Next());
Object receive;
try {
- receive =
inbox.receive(Duration.create(actorConfiguration.getResultFetchTimeout(DEFAULT_WAIT_TIMEOUT),
- TimeUnit.MILLISECONDS));
+ receive = inbox.receive(WAIT_TIME);
} catch (Throwable ex) {
String errorMessage = "Result fetch timed out";
LOG.error(errorMessage, ex);
diff --git
a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/AsyncJobRunnerImpl.java
b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/AsyncJobRunnerImpl.java
index 7013f8a..7804103 100644
---
a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/AsyncJobRunnerImpl.java
+++
b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/AsyncJobRunnerImpl.java
@@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit;
public class AsyncJobRunnerImpl implements AsyncJobRunner {
private final Logger LOG = LoggerFactory.getLogger(getClass());
+ private static scala.concurrent.duration.FiniteDuration WAIT_TIME =
Duration.create(12, TimeUnit.HOURS);
private final ActorRef controller;
private final ActorSystem system;
@@ -76,7 +77,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner {
public Optional<NonPersistentCursor> getCursor(String jobId, String
username) {
Inbox inbox = Inbox.create(system);
inbox.send(controller, new FetchResult(jobId, username));
- Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES));
+ Object receive = inbox.receive(WAIT_TIME);
if(receive instanceof ResultNotReady) {
String errorString = "Result not ready for job: " + jobId + ", username:
" + username + ". Try after sometime.";
LOG.info(errorString);
@@ -98,7 +99,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner {
public Optional<NonPersistentCursor> resetAndGetCursor(String jobId, String
username) {
Inbox inbox = Inbox.create(system);
inbox.send(controller, new FetchResult(jobId, username));
- Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES));
+ Object receive = inbox.receive(WAIT_TIME);
if(receive instanceof ResultNotReady) {
String errorString = "Result not ready for job: " + jobId + ", username:
" + username + ". Try after sometime.";
LOG.info(errorString);
@@ -110,7 +111,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner {
Optional<ActorRef> iterator = (Optional<ActorRef>) receive;
if(iterator.isPresent()) {
inbox.send(iterator.get(), new ResetCursor());
- Object resetResult = inbox.receive(Duration.create(1,
TimeUnit.MINUTES));
+ Object resetResult = inbox.receive(WAIT_TIME);
if (resetResult instanceof CursorReset) {
return Optional.of(new NonPersistentCursor(context, system,
iterator.get()));
} else {
@@ -126,7 +127,7 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner {
public Optional<Failure> getError(String jobId, String username) {
Inbox inbox = Inbox.create(system);
inbox.send(controller, new FetchError(jobId, username));
- Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES));
+ Object receive = inbox.receive(WAIT_TIME);
if(receive instanceof FetchFailed){
FetchFailed fetchFailed = (FetchFailed) receive;
return Optional.of(new Failure(fetchFailed.getMessage(),
getExceptionForRetry()));
diff --git
a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/NonPersistentCursor.java
b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/NonPersistentCursor.java
index 80ffe79..c316579 100644
---
a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/NonPersistentCursor.java
+++
b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/client/NonPersistentCursor.java
@@ -45,7 +45,7 @@ import java.util.concurrent.TimeUnit;
*/
public class NonPersistentCursor implements Cursor<Row, ColumnDescription> {
private final Logger LOG = LoggerFactory.getLogger(getClass());
- private static long DEFAULT_WAIT_TIMEOUT = 60 * 1000L;
+ private static scala.concurrent.duration.FiniteDuration WAIT_TIME =
Duration.create(12, TimeUnit.HOURS);
private final ActorSystem system;
private final ActorRef actorRef;
@@ -124,8 +124,7 @@ public class NonPersistentCursor implements Cursor<Row,
ColumnDescription> {
inbox.send(actorRef, new Next());
Object receive;
try {
- receive =
inbox.receive(Duration.create(actorConfiguration.getResultFetchTimeout(DEFAULT_WAIT_TIMEOUT),
- TimeUnit.MILLISECONDS));
+ receive = inbox.receive(WAIT_TIME);
} catch (Throwable ex) {
String errorMessage = "Result fetch timed out";
LOG.error(errorMessage, ex);