Repository: ambari Updated Branches: refs/heads/branch-2.4 22cc2d5fa -> 600573784
http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignStatement.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignStatement.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignStatement.java deleted file mode 100644 index 85273ab..0000000 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignStatement.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive2.actor.message; - -import com.google.common.base.Optional; - -import java.sql.Statement; - -public class AssignStatement { - - private Statement resultSet; - - public AssignStatement(Statement statement) { - this.resultSet = statement; - } - - - public Statement getStatement() { - return resultSet; - } - - @Override - public String toString() { - return "AssignStatement{" + - "resultSet=" + resultSet + - '}'; - } - - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AsyncJob.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AsyncJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AsyncJob.java deleted file mode 100644 index 6dfd709..0000000 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AsyncJob.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive2.actor.message; - -import org.apache.ambari.view.ViewContext; - -/** - * Message to be sent when a statement has to be executed - */ -public class AsyncJob extends DDLJob { - private final String jobId; - private final String logFile; - - public AsyncJob(String jobId, String username, String[] statements, String logFile,ViewContext viewContext) { - super(Type.ASYNC, statements, username,viewContext); - this.jobId = jobId; - this.logFile = logFile; - } - - public String getJobId() { - return jobId; - } - - public String getLogFile() { - return logFile; - } - - - @Override - public String toString() { - return "AsyncJob{" + - "jobId='" + jobId + '\'' + - ", logFile='" + logFile + '\'' + - "} " + super.toString(); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/Connect.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/Connect.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/Connect.java index b859ac1..49ef3df 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/Connect.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/Connect.java @@ -18,6 +18,7 @@ package org.apache.ambari.view.hive2.actor.message; +import com.google.common.base.Optional; import org.apache.ambari.view.hive2.internal.Connectable; import org.apache.ambari.view.hive2.internal.HiveConnectionWrapper; @@ -26,17 +27,29 @@ import org.apache.ambari.view.hive2.internal.HiveConnectionWrapper; */ public class Connect { + private final HiveJob.Type type; + private final String jobId; private final String username; private final String password; private final String jdbcUrl; - public Connect(String username, String password, String jdbcUrl) { + private Connect(HiveJob.Type type, String jobId, String username, String password, String jdbcUrl) { + this.type = type; + this.jobId = jobId; this.username = username; this.password = password; this.jdbcUrl = jdbcUrl; } + public Connect(String jobId, String username, String password, String jdbcUrl) { + this(HiveJob.Type.ASYNC, jobId, username, password, jdbcUrl); + } + + public Connect(String username, String password, String jdbcUrl) { + this(HiveJob.Type.SYNC, null, username, password, jdbcUrl); + } + public Connectable getConnectable(){ return new HiveConnectionWrapper(getJdbcUrl(),username,password); } @@ -53,4 +66,11 @@ public class Connect { return jdbcUrl; } + public HiveJob.Type getType() { + return type; + } + + public Optional<String> getJobId() { + return Optional.fromNullable(jobId); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/DDLJob.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/DDLJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/DDLJob.java deleted file mode 100644 index 7e19a77..0000000 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/DDLJob.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive2.actor.message; - -import com.google.common.collect.ImmutableList; -import org.apache.ambari.view.ViewContext; -import org.apache.commons.lang.StringUtils; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; - - -public class DDLJob extends HiveJob { - - public static final String SEMICOLON = ";"; - private String[] statements; - - public DDLJob(Type type, String[] statements, String username, ViewContext viewContext) { - super(type, username, viewContext); - this.statements = new String[statements.length]; - for (int i = 0; i < statements.length; i++) { - this.statements[i] = clean(statements[i]); - - } - - } - - private String clean(String statement) { - return StringUtils.trim(statement); - } - - public Collection<String> getStatements() { - return Arrays.asList(statements); - } - - /** - * Get the statements to be executed synchronously - * - * @return - */ - public Collection<String> getSyncStatements() { - if (!(statements.length > 1)) - return Collections.emptyList(); - else - return ImmutableList.copyOf(Arrays.copyOfRange(statements, 0, statements.length - 1)); - } - - /** - * Get the statement to be executed asynchronously - * - * @return async statement - */ - public String getAsyncStatement() { - return statements[statements.length - 1]; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java index defa08c..e285e36 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/GetColumnMetadataJob.java @@ -24,6 +24,7 @@ public class GetColumnMetadataJob extends HiveJob { private final String schemaPattern; private final String tablePattern; private final String columnPattern; + public GetColumnMetadataJob(String username, ViewContext viewContext, String schemaPattern, String tablePattern, String columnPattern) { super(Type.SYNC, username, viewContext); http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobExecutionCompleted.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobExecutionCompleted.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobExecutionCompleted.java deleted file mode 100644 index 52ba3f5..0000000 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/JobExecutionCompleted.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive2.actor.message; - -public class JobExecutionCompleted {} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultInformation.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultInformation.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultInformation.java new file mode 100644 index 0000000..bf1d21e --- /dev/null +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultInformation.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive2.actor.message; + +import com.google.common.base.Optional; +import org.apache.ambari.view.hive2.actor.message.job.Failure; + +import java.sql.ResultSet; + +/** + * Message used to send execution complete message. + * It may contain a ResultSet if the execution returns a ResultSet. + */ +public class ResultInformation { + /** + * Execution id to identify the result correspondence of the result with the request + */ + private final int id; + + /** + * If the execution returns a ResultSet then this will refer to the ResultSet + */ + private final ResultSet resultSet; + + private final Failure failure; + + private final boolean cancelled; + + private ResultInformation(int id, ResultSet resultSet, Failure failure, boolean cancelled) { + this.id = id; + this.resultSet = resultSet; + this.failure = failure; + this.cancelled = cancelled; + } + + public ResultInformation(int id, ResultSet resultSet) { + this(id, resultSet, null, false); + } + + public ResultInformation(int id) { + this(id, null, null, false); + } + + public ResultInformation(int id, Failure failure) { + this(id, null, failure, false); + } + + public ResultInformation(int id, boolean cancelled) { + this(id, null, null, cancelled); + } + + public int getId() { + return id; + } + + public Optional<ResultSet> getResultSet() { + return Optional.fromNullable(resultSet); + } + + public Optional<Failure> getFailure() { + return Optional.fromNullable(failure); + } + + public boolean isCancelled() { + return cancelled; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultNotReady.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultNotReady.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultNotReady.java new file mode 100644 index 0000000..6678d68 --- /dev/null +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultNotReady.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive2.actor.message; + +/** + * In case of a async execution, this is used to tell that the result has not completed and is not ready to be + * returned back + */ +public class ResultNotReady { + private final String jobId; + private final String username; + public ResultNotReady(String jobId, String username) { + this.jobId = jobId; + this.username = username; + } + + public String getJobId() { + return jobId; + } + + public String getUsername() { + return username; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultReady.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultReady.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultReady.java index 65de920..cb54cbd 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultReady.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/ResultReady.java @@ -19,29 +19,26 @@ package org.apache.ambari.view.hive2.actor.message; import akka.actor.ActorRef; -import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed; -import org.apache.ambari.view.hive2.internal.Either; +import com.google.common.base.Optional; /** - * * Fetch the result for - * */ public class ResultReady extends FetchResult { - private Either<ActorRef, ActorRef> result; + private final ActorRef result; - public ResultReady(String jobId, String username, Either<ActorRef, ActorRef> result) { - super(jobId, username); - this.result = result; - } + public ResultReady(String jobId, String username, ActorRef result) { + super(jobId, username); + this.result = result; + } - public Either<ActorRef, ActorRef> getResult() { - return result; - } + public ResultReady(String jobId, String username) { + this(jobId, username, null); + } - public void setResult(Either<ActorRef, ActorRef> result) { - this.result = result; - } + public Optional<ActorRef> getResult() { + return Optional.fromNullable(result); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/RunStatement.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/RunStatement.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/RunStatement.java new file mode 100644 index 0000000..8679c44 --- /dev/null +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/RunStatement.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive2.actor.message; + +import com.google.common.base.Optional; + +/** + * Message sent by JdbcConnector to StatementExecutor to run a statement + */ +public class RunStatement { + /** + * This is the execution id meant to identify the executing statement sequence + */ + private final int id; + private final String statement; + private final String logFile; + private final String jobId; + private final boolean startLogAggregation; + private final boolean startGUIDFetch; + + public RunStatement(int id, String statement, String jobId, boolean startLogAggregation, String logFile, boolean startGUIDFetch) { + this.id = id; + this.statement = statement; + this.jobId = jobId; + this.logFile = logFile; + this.startLogAggregation = startLogAggregation; + this.startGUIDFetch = startGUIDFetch; + } + + public RunStatement(int id, String statement) { + this(id, statement, null, false, null, false); + } + + public int getId() { + return id; + } + + public String getStatement() { + return statement; + } + + public Optional<String> getLogFile() { + return Optional.fromNullable(logFile); + } + + public boolean shouldStartLogAggregation() { + return startLogAggregation; + } + + public boolean shouldStartGUIDFetch() { + return startGUIDFetch; + } + + public Optional<String> getJobId() { + return Optional.fromNullable(jobId); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SQLStatementJob.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SQLStatementJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SQLStatementJob.java new file mode 100644 index 0000000..f5af068 --- /dev/null +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SQLStatementJob.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive2.actor.message; + +import com.google.common.base.Optional; +import org.apache.ambari.view.ViewContext; +import org.apache.commons.lang.StringUtils; + +import java.util.Arrays; +import java.util.Collection; + + +public class SQLStatementJob extends HiveJob { + + public static final String SEMICOLON = ";"; + private String[] statements; + + private final String jobId; + private final String logFile; + + public SQLStatementJob(Type type, String[] statements, String username, String jobId, String logFile, ViewContext viewContext) { + super(type, username, viewContext); + this.statements = new String[statements.length]; + this.jobId = jobId; + this.logFile = logFile; + for (int i = 0; i < statements.length; i++) { + this.statements[i] = clean(statements[i]); + } + } + public SQLStatementJob(Type type, String[] statements, String username, ViewContext viewContext) { + this(type, statements, username, null, null, viewContext); + } + + private String clean(String statement) { + return StringUtils.trim(statement); + } + + public Collection<String> getStatements() { + return Arrays.asList(statements); + } + + public Optional<String> getJobId() { + return Optional.fromNullable(jobId); + } + + public Optional<String> getLogFile() { + return Optional.fromNullable(logFile); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java index 3bae12a..b56da08 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java @@ -18,4 +18,17 @@ package org.apache.ambari.view.hive2.actor.message; -public class StartLogAggregation {} +public class StartLogAggregation { + private String statement; + + public StartLogAggregation() { + } + + public StartLogAggregation(String statement) { + this.statement = statement; + } + + public String getStatement() { + return statement; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SyncJob.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SyncJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SyncJob.java deleted file mode 100644 index 7aece31..0000000 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/SyncJob.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive2.actor.message; - -import org.apache.ambari.view.ViewContext; - -public class SyncJob extends DDLJob { - public SyncJob(String username, String[] statements,ViewContext viewContext) { - super(Type.SYNC, statements, username,viewContext); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/CancelJob.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/CancelJob.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/CancelJob.java new file mode 100644 index 0000000..ddaff19 --- /dev/null +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/CancelJob.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive2.actor.message.job; + +/** + * Message to cancel the currently running job. This is used for stopping execution of a job from api + */ +public class CancelJob { + private final String jobId; + private final String username; + + public CancelJob(String jobId, String username) { + this.jobId = jobId; + this.username = username; + } + + public String getJobId() { + return jobId; + } + + public String getUsername() { + return username; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/ExecuteNextStatement.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/ExecuteNextStatement.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/ExecuteNextStatement.java new file mode 100644 index 0000000..ff40aab --- /dev/null +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/ExecuteNextStatement.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive2.actor.message.job; + +public class ExecuteNextStatement { +} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/UpdateYarnAtsGuid.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/UpdateYarnAtsGuid.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/UpdateYarnAtsGuid.java new file mode 100644 index 0000000..7edbab4 --- /dev/null +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/job/UpdateYarnAtsGuid.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive2.actor.message.job; + +import org.apache.hive.jdbc.HiveStatement; + +public class UpdateYarnAtsGuid { + private final HiveStatement statement; + private final String jobId; + public UpdateYarnAtsGuid(HiveStatement statement, String jobId) { + this.statement = statement; + this.jobId = jobId; + } + + public HiveStatement getStatement() { + return statement; + } + + public String getJobId() { + return jobId; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunner.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunner.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunner.java index 829e57c..548dfae 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunner.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunner.java @@ -19,18 +19,21 @@ package org.apache.ambari.view.hive2.client; import com.google.common.base.Optional; -import org.apache.ambari.view.hive2.resources.jobs.viewJobs.Job; -import org.apache.ambari.view.hive2.actor.message.AsyncJob; +import org.apache.ambari.view.hive2.actor.message.SQLStatementJob; import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed; +import org.apache.ambari.view.hive2.actor.message.job.Failure; +import org.apache.ambari.view.hive2.resources.jobs.viewJobs.Job; public interface AsyncJobRunner { - void submitJob(ConnectionConfig connectionConfig, AsyncJob asyncJob, Job job); + void submitJob(ConnectionConfig connectionConfig, SQLStatementJob asyncJob, Job job); + + void cancelJob(String jobId, String username); - Optional<NonPersistentCursor> getCursor(String jobId, String username); + Optional<NonPersistentCursor> getCursor(String jobId, String username); - Optional<NonPersistentCursor> resetAndGetCursor(String jobId, String username); + Optional<NonPersistentCursor> resetAndGetCursor(String jobId, String username); - Optional<AsyncExecutionFailed> getError(String jobId, String username); + Optional<Failure> getError(String jobId, String username); } http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java index f9e6d67..286298a 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/AsyncJobRunnerImpl.java @@ -23,16 +23,21 @@ import akka.actor.ActorSystem; import akka.actor.Inbox; import com.google.common.base.Optional; import org.apache.ambari.view.ViewContext; -import org.apache.ambari.view.hive2.actor.message.CursorReset; -import org.apache.ambari.view.hive2.actor.message.FetchError; -import org.apache.ambari.view.hive2.actor.message.ResetCursor; -import org.apache.ambari.view.hive2.resources.jobs.viewJobs.Job; -import org.apache.ambari.view.hive2.actor.message.AsyncJob; import org.apache.ambari.view.hive2.actor.message.Connect; +import org.apache.ambari.view.hive2.actor.message.CursorReset; import org.apache.ambari.view.hive2.actor.message.ExecuteJob; +import org.apache.ambari.view.hive2.actor.message.FetchError; import org.apache.ambari.view.hive2.actor.message.FetchResult; +import org.apache.ambari.view.hive2.actor.message.ResetCursor; +import org.apache.ambari.view.hive2.actor.message.ResultNotReady; +import org.apache.ambari.view.hive2.actor.message.SQLStatementJob; import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed; -import org.apache.ambari.view.hive2.internal.Either; +import org.apache.ambari.view.hive2.actor.message.job.CancelJob; +import org.apache.ambari.view.hive2.actor.message.job.Failure; +import org.apache.ambari.view.hive2.resources.jobs.viewJobs.Job; +import org.apache.ambari.view.hive2.utils.ResultFetchFormattedException; +import org.apache.ambari.view.hive2.utils.ResultNotReadyFormattedException; +import org.apache.ambari.view.hive2.utils.ServiceFormattedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; @@ -54,28 +59,38 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner { } + @Override + public void submitJob(ConnectionConfig config, SQLStatementJob job, Job jobp) { + Connect connect = config.createConnectMessage(jobp.getId()); + ExecuteJob executeJob = new ExecuteJob(connect, job); + controller.tell(executeJob, ActorRef.noSender()); + } - @Override - public void submitJob(ConnectionConfig config, AsyncJob job, Job jobp) { - Connect connect = config.createConnectMessage(); - ExecuteJob executeJob = new ExecuteJob(connect, job); - controller.tell(executeJob,ActorRef.noSender()); - } + @Override + public void cancelJob(String jobId, String username) { + controller.tell(new CancelJob(jobId, username), ActorRef.noSender()); + } @Override public Optional<NonPersistentCursor> getCursor(String jobId, String username) { Inbox inbox = Inbox.create(system); inbox.send(controller, new FetchResult(jobId, username)); Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); - Either<ActorRef, ActorRef> result = (Either<ActorRef, ActorRef>) receive; - if (result.isRight()) { - return Optional.absent(); - - } else if (result.isLeft()) { - return Optional.of(new NonPersistentCursor(context, system, result.getLeft())); + if(receive instanceof ResultNotReady) { + String errorString = "Result not ready for job: " + jobId + ", username: " + username + ". Try after sometime."; + LOG.info(errorString); + throw new ResultNotReadyFormattedException(errorString, new Exception(errorString)); + } else if(receive instanceof Failure) { + Failure failure = (Failure) receive; + throw new ResultFetchFormattedException(failure.getMessage(), failure.getError()); + } else { + Optional<ActorRef> iterator = (Optional<ActorRef>) receive; + if(iterator.isPresent()) { + return Optional.of(new NonPersistentCursor(context, system, iterator.get())); + } else { + return Optional.absent(); + } } - - return Optional.absent(); } @Override @@ -83,33 +98,37 @@ public class AsyncJobRunnerImpl implements AsyncJobRunner { Inbox inbox = Inbox.create(system); inbox.send(controller, new FetchResult(jobId, username)); Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); - Either<ActorRef, ActorRef> result = (Either<ActorRef, ActorRef>) receive; - if (result.isRight()) { - return Optional.absent(); - - } else if (result.isLeft()) { - // Reset the result set cursor - inbox.send(result.getLeft(), new ResetCursor()); - Object resetResult = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); - if (resetResult instanceof CursorReset) { - return Optional.of(new NonPersistentCursor(context, system, result.getLeft())); + if(receive instanceof ResultNotReady) { + String errorString = "Result not ready for job: " + jobId + ", username: " + username + ". Try after sometime."; + LOG.info(errorString); + throw new ResultNotReadyFormattedException(errorString, new Exception(errorString)); + } else if(receive instanceof Failure) { + Failure failure = (Failure) receive; + throw new ResultFetchFormattedException(failure.getMessage(), failure.getError()); + } else { + Optional<ActorRef> iterator = (Optional<ActorRef>) receive; + if(iterator.isPresent()) { + inbox.send(iterator.get(), new ResetCursor()); + Object resetResult = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); + if (resetResult instanceof CursorReset) { + return Optional.of(new NonPersistentCursor(context, system, iterator.get())); + } else { + return Optional.absent(); + } } else { return Optional.absent(); } - } - - return Optional.absent(); } - @Override - public Optional<AsyncExecutionFailed> getError(String jobId, String username) { - Inbox inbox = Inbox.create(system); - inbox.send(controller, new FetchError(jobId, username)); - Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); - Optional<AsyncExecutionFailed> result = (Optional<AsyncExecutionFailed>) receive; - return result; - } + @Override + public Optional<Failure> getError(String jobId, String username) { + Inbox inbox = Inbox.create(system); + inbox.send(controller, new FetchError(jobId, username)); + Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); + Optional<Failure> result = (Optional<Failure>) receive; + return result; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/ConnectionConfig.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/ConnectionConfig.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/ConnectionConfig.java index 4ae5577..40cdd28 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/ConnectionConfig.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/ConnectionConfig.java @@ -19,6 +19,7 @@ package org.apache.ambari.view.hive2.client; import org.apache.ambari.view.hive2.actor.message.Connect; +import org.apache.ambari.view.hive2.actor.message.HiveJob; public class ConnectionConfig { private final String username; @@ -47,5 +48,9 @@ public class ConnectionConfig { return new Connect(username, password, jdbcUrl); } + public Connect createConnectMessage(String jobId) { + return new Connect(jobId, username, password, jdbcUrl); + } + } http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/DDLDelegatorImpl.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/DDLDelegatorImpl.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/DDLDelegatorImpl.java index 52be70e..72eca4c 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/DDLDelegatorImpl.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/client/DDLDelegatorImpl.java @@ -28,13 +28,11 @@ import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.ambari.view.ViewContext; -import org.apache.ambari.view.hive2.utils.HiveActorConfiguration; -import org.apache.ambari.view.hive2.utils.ServiceFormattedException; import org.apache.ambari.view.hive2.actor.message.Connect; import org.apache.ambari.view.hive2.actor.message.ExecuteJob; import org.apache.ambari.view.hive2.actor.message.GetColumnMetadataJob; import org.apache.ambari.view.hive2.actor.message.HiveJob; -import org.apache.ambari.view.hive2.actor.message.SyncJob; +import org.apache.ambari.view.hive2.actor.message.SQLStatementJob; import org.apache.ambari.view.hive2.actor.message.job.ExecutionFailed; import org.apache.ambari.view.hive2.actor.message.job.FetchFailed; import org.apache.ambari.view.hive2.actor.message.job.Next; @@ -42,6 +40,8 @@ import org.apache.ambari.view.hive2.actor.message.job.NoMoreItems; import org.apache.ambari.view.hive2.actor.message.job.NoResult; import org.apache.ambari.view.hive2.actor.message.job.Result; import org.apache.ambari.view.hive2.actor.message.job.ResultSetHolder; +import org.apache.ambari.view.hive2.utils.HiveActorConfiguration; +import org.apache.ambari.view.hive2.utils.ServiceFormattedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; @@ -144,7 +144,7 @@ public class DDLDelegatorImpl implements DDLDelegator { private Optional<Result> getRowsFromDB(ConnectionConfig config, String[] statements) { Connect connect = config.createConnectMessage(); - HiveJob job = new SyncJob(config.getUsername(), statements, context); + HiveJob job = new SQLStatementJob(HiveJob.Type.SYNC, statements, config.getUsername(), context); ExecuteJob execute = new ExecuteJob(connect, job); LOG.info("Executing query: {}, for user: {}", getJoinedStatements(statements), job.getUsername()); http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/internal/Either.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/internal/Either.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/internal/Either.java index 52fb01e..4f25552 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/internal/Either.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/internal/Either.java @@ -23,54 +23,55 @@ import com.google.common.base.Optional; /** * Simple implementation of a container class which can * hold one of two values - * + * <p> * Callers should check if the value if left or right before * trying to get the value * * @param <L> Left Value * @param <R> Right value */ -public class Either<L,R> { +public class Either<L, R> { - private final Optional<L> left; - private final Optional<R> right; + private final Optional<L> left; + private final Optional<R> right; - public boolean isLeft(){ - return left.isPresent() && !right.isPresent(); - } + public boolean isLeft() { + return left.isPresent() && !right.isPresent(); + } - public boolean isRight(){ - return !left.isPresent() && right.isPresent(); - } + public boolean isRight() { + return !left.isPresent() && right.isPresent(); + } - public L getLeft(){ - return left.orNull(); - } + public boolean isNone() { return !(left.isPresent() || right.isPresent()); } - public R getRight(){ - return right.orNull(); - } + public L getLeft() { + return left.orNull(); + } + public R getRight() { + return right.orNull(); + } - private Either(Optional<L> left, Optional<R> right) { - this.left = left; - this.right = right; - } + private Either(Optional<L> left, Optional<R> right) { + this.left = left; + this.right = right; + } - public static <L,R> Either<L,R> left(L value) { - return new Either<>(Optional.of(value), Optional.<R>absent()); - } + public static <L, R> Either<L, R> left(L value) { + return new Either<>(Optional.of(value), Optional.<R>absent()); + } - public static <L,R> Either<L,R> right(R value) { - return new Either<>(Optional.<L>absent(), Optional.of(value)); - } + public static <L, R> Either<L, R> right(R value) { + return new Either<>(Optional.<L>absent(), Optional.of(value)); + } - public static <L,R> Either<L,R> none() { - return new Either<>(Optional.<L>absent(), Optional.<R>absent()); - } + public static <L, R> Either<L, R> none() { + return new Either<>(Optional.<L>absent(), Optional.<R>absent()); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java index e7553dd..4293b1b 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java @@ -163,11 +163,6 @@ public class Aggregator { !viewJob.getStatus().equalsIgnoreCase(tezDagId.status)) { viewJob.setDagId(tezDagId.entity); viewJob.setApplicationId(tezDagId.applicationId); - if (!(viewJob.getStatus().equalsIgnoreCase(Job.JOB_STATE_FINISHED) - || (viewJob.getStatus().equalsIgnoreCase(Job.JOB_STATE_ERROR)) - || viewJob.getStatus().equalsIgnoreCase(Job.JOB_STATE_CANCELED))) { - viewJob.setStatus(tezDagId.status); - } viewJobResourceManager.update(viewJob, viewJob.getId()); } } @@ -190,8 +185,6 @@ public class Aggregator { if (atsHiveQuery.dagNames != null && atsHiveQuery.dagNames.size() > 0) atsJob.setDagName(atsHiveQuery.dagNames.get(0)); atsJob.setDagId(atsTezDag.entity); - if (atsTezDag.status != null && !atsTezDag.status.equals(TezDagId.STATUS_UNKNOWN)) - atsJob.setStatus(atsTezDag.status); if (atsHiveQuery.starttime != 0) atsJob.setDateSubmitted(atsHiveQuery.starttime); atsJob.setDuration(atsHiveQuery.duration); http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/JobService.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/JobService.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/JobService.java index 4fa7f2a..975036e 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/JobService.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/JobService.java @@ -22,7 +22,8 @@ import com.beust.jcommander.internal.Lists; import com.google.common.base.Optional; import org.apache.ambari.view.ViewResourceHandler; import org.apache.ambari.view.hive2.BaseService; -import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed; +import org.apache.ambari.view.hive2.ConnectionSystem; +import org.apache.ambari.view.hive2.actor.message.job.Failure; import org.apache.ambari.view.hive2.backgroundjobs.BackgroundJobController; import org.apache.ambari.view.hive2.client.AsyncJobRunner; import org.apache.ambari.view.hive2.client.AsyncJobRunnerImpl; @@ -42,7 +43,6 @@ import org.apache.ambari.view.hive2.utils.MisconfigurationFormattedException; import org.apache.ambari.view.hive2.utils.NotFoundFormattedException; import org.apache.ambari.view.hive2.utils.ServiceFormattedException; import org.apache.ambari.view.hive2.utils.SharedObjectsFactory; -import org.apache.ambari.view.hive2.ConnectionSystem; import org.apache.commons.beanutils.PropertyUtils; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; @@ -130,10 +130,10 @@ public class JobService extends BaseService { JobController jobController = getResourceManager().readController(jobId); Job job = jobController.getJob(); - if(job.getStatus().equals(Job.JOB_STATE_ERROR)){ + if(job.getStatus().equals(Job.JOB_STATE_ERROR) || job.getStatus().equals(Job.JOB_STATE_CANCELED)){ ConnectionSystem system = ConnectionSystem.getInstance(); final AsyncJobRunner asyncJobRunner = new AsyncJobRunnerImpl(context, system.getOperationController(context), system.getActorSystem()); - Optional<AsyncExecutionFailed> error = asyncJobRunner.getError(jobId, context.getUsername()); + Optional<Failure> error = asyncJobRunner.getError(jobId, context.getUsername()); if(error.isPresent()){ throw new Exception(error.get().getError()); } http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/ResultsPaginationController.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/ResultsPaginationController.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/ResultsPaginationController.java index bb0d053..d78b4ee 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/ResultsPaginationController.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/ResultsPaginationController.java @@ -30,6 +30,8 @@ import org.apache.ambari.view.hive2.client.HiveClientException; import org.apache.ambari.view.hive2.client.Row; import org.apache.ambari.view.hive2.utils.BadRequestFormattedException; import org.apache.ambari.view.hive2.utils.HiveClientFormattedException; +import org.apache.ambari.view.hive2.utils.ResultFetchFormattedException; +import org.apache.ambari.view.hive2.utils.ResultNotReadyFormattedException; import org.apache.ambari.view.hive2.utils.ServiceFormattedException; import org.apache.commons.collections4.map.PassiveExpiringMap; import org.apache.hadoop.hbase.util.Strings; @@ -113,8 +115,8 @@ public class ResultsPaginationController { if (resultSet.isResettable()) { resultSet.reset(); } - } catch (HiveClientException ex) { - throw new HiveClientFormattedException(ex); + } catch (ResultNotReadyFormattedException | ResultFetchFormattedException ex) { + throw ex; } catch (Exception ex) { throw new ServiceFormattedException(ex.getMessage(), ex); } http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerImpl.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerImpl.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerImpl.java index a3edc8f..aab6e71 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerImpl.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/viewJobs/JobControllerImpl.java @@ -19,6 +19,10 @@ package org.apache.ambari.view.hive2.resources.jobs.viewJobs; import org.apache.ambari.view.ViewContext; +import org.apache.ambari.view.hive2.ConnectionFactory; +import org.apache.ambari.view.hive2.ConnectionSystem; +import org.apache.ambari.view.hive2.actor.message.HiveJob; +import org.apache.ambari.view.hive2.actor.message.SQLStatementJob; import org.apache.ambari.view.hive2.client.AsyncJobRunner; import org.apache.ambari.view.hive2.client.AsyncJobRunnerImpl; import org.apache.ambari.view.hive2.client.ConnectionConfig; @@ -32,9 +36,6 @@ import org.apache.ambari.view.hive2.utils.BadRequestFormattedException; import org.apache.ambari.view.hive2.utils.FilePaginator; import org.apache.ambari.view.hive2.utils.MisconfigurationFormattedException; import org.apache.ambari.view.hive2.utils.ServiceFormattedException; -import org.apache.ambari.view.hive2.ConnectionFactory; -import org.apache.ambari.view.hive2.ConnectionSystem; -import org.apache.ambari.view.hive2.actor.message.AsyncJob; import org.apache.ambari.view.utils.hdfs.HdfsApi; import org.apache.ambari.view.utils.hdfs.HdfsApiException; import org.apache.ambari.view.utils.hdfs.HdfsUtil; @@ -106,7 +107,7 @@ public class JobControllerImpl implements JobController, ModifyNotificationDeleg String query = getQueryForJob(); ConnectionSystem system = ConnectionSystem.getInstance(); AsyncJobRunner asyncJobRunner = new AsyncJobRunnerImpl(context, system.getOperationController(context), system.getActorSystem()); - AsyncJob asyncJob = new AsyncJob(job.getId(), context.getUsername(), getStatements(jobDatabase, query), job.getLogFile(), context); + SQLStatementJob asyncJob = new SQLStatementJob(HiveJob.Type.ASYNC, getStatements(jobDatabase, query), context.getUsername(), job.getId(), job.getLogFile(), context); asyncJobRunner.submitJob(getHiveConnectionConfig(), asyncJob, job); } @@ -122,7 +123,9 @@ public class JobControllerImpl implements JobController, ModifyNotificationDeleg @Override public void cancel() throws ItemNotFound { - //TODO: Cancel job + ConnectionSystem system = ConnectionSystem.getInstance(); + AsyncJobRunner asyncJobRunner = new AsyncJobRunnerImpl(context, system.getOperationController(context), system.getActorSystem()); + asyncJobRunner.cancelJob(job.getId(), context.getUsername()); } @Override http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/utils/ResultFetchFormattedException.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/utils/ResultFetchFormattedException.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/utils/ResultFetchFormattedException.java new file mode 100644 index 0000000..50a5b63 --- /dev/null +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/utils/ResultFetchFormattedException.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive2.utils; + +public class ResultFetchFormattedException extends ServiceFormattedException { + private final static int STATUS = 500; + + public ResultFetchFormattedException(String message, Throwable exception) { + super(message, exception, STATUS); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/utils/ResultNotReadyFormattedException.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/utils/ResultNotReadyFormattedException.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/utils/ResultNotReadyFormattedException.java new file mode 100644 index 0000000..16f7df9 --- /dev/null +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/utils/ResultNotReadyFormattedException.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive2.utils; + +public class ResultNotReadyFormattedException extends ServiceFormattedException { + private final static int STATUS = 409; + + public ResultNotReadyFormattedException(String message, Throwable exception) { + super(message, exception, STATUS); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/resources/ui/hive-web/app/controllers/index.js ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/resources/ui/hive-web/app/controllers/index.js b/contrib/views/hive-next/src/main/resources/ui/hive-web/app/controllers/index.js index 9584508..4f3261c 100644 --- a/contrib/views/hive-next/src/main/resources/ui/hive-web/app/controllers/index.js +++ b/contrib/views/hive-next/src/main/resources/ui/hive-web/app/controllers/index.js @@ -204,13 +204,23 @@ export default Ember.Controller.extend({ getVisualExplainJson: function (job, originalModel) { var self = this; var defer = Ember.RSVP.defer(); + var attempt = 3; - job.save().then(function () { + var getResult = function() { self.get('results').getResultsJson(job).then(function (json) { defer.resolve(json); }, function (err) { - defer.reject(err); + if(err.status === 409 && attempt > 0) { + attempt--; + Ember.run.later(self, getResult, 3000); // Retry after 3 seconds + } else { + defer.reject(err); + } }); + }; + + job.save().then(function () { + Ember.run.later(self, getResult, 1000); }, function (err) { defer.reject(err); }); http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/resources/ui/hive-web/app/controllers/upload-table.js ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/resources/ui/hive-web/app/controllers/upload-table.js b/contrib/views/hive-next/src/main/resources/ui/hive-web/app/controllers/upload-table.js index 3f17760..cb267a0 100644 --- a/contrib/views/hive-next/src/main/resources/ui/hive-web/app/controllers/upload-table.js +++ b/contrib/views/hive-next/src/main/resources/ui/hive-web/app/controllers/upload-table.js @@ -185,7 +185,9 @@ export default Ember.Controller.extend({ reject(new Error(status)); } else { console.log("retrying waitForJobStatus : "); - self.waitForJobStatus(jobId, resolve, reject); + Ember.run.later(self, function() { + this.waitForJobStatus(jobId, resolve, reject); + }, 1000); } }, function (error) { console.log("rejecting waitForJobStatus with : " + error); http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/resources/ui/hive-web/app/services/job-progress.js ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/resources/ui/hive-web/app/services/job-progress.js b/contrib/views/hive-next/src/main/resources/ui/hive-web/app/services/job-progress.js index 1e0b96b..8432b84 100644 --- a/contrib/views/hive-next/src/main/resources/ui/hive-web/app/services/job-progress.js +++ b/contrib/views/hive-next/src/main/resources/ui/hive-web/app/services/job-progress.js @@ -93,6 +93,15 @@ export default Ember.Service.extend({ } else { job.set('retrievingProgress'); } + }, function() { + console.log("Failed to fetch progress. Retrying."); + if (job.get('model.isRunning')) { + Ember.run.later(function () { + self.reloadProgress(job); + }, 1000); + } else { + job.set('retrievingProgress'); + } }); }, http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/test/java/org/apache/ambari/view/hive2/AsyncQueriesTest.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/test/java/org/apache/ambari/view/hive2/AsyncQueriesTest.java b/contrib/views/hive-next/src/test/java/org/apache/ambari/view/hive2/AsyncQueriesTest.java deleted file mode 100644 index 38ed2b3..0000000 --- a/contrib/views/hive-next/src/test/java/org/apache/ambari/view/hive2/AsyncQueriesTest.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive2; - - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Inbox; -import akka.actor.Props; -import akka.testkit.JavaTestKit; -import org.apache.ambari.view.hive2.actor.OperationController; -import org.apache.ambari.view.hive2.actor.message.AsyncJob; -import org.apache.ambari.view.hive2.actor.message.ExecuteJob; -import org.apache.ambari.view.hive2.actor.message.FetchResult; -import org.apache.ambari.view.hive2.actor.message.JobSubmitted; -import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed; -import org.apache.ambari.view.hive2.actor.message.job.Next; -import org.apache.ambari.view.hive2.internal.ConnectionException; -import org.apache.ambari.view.hive2.internal.Either; -import org.apache.ambari.view.hive2.internal.HiveResult; -import org.apache.hive.jdbc.HiveStatement; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import scala.concurrent.duration.Duration; - -import java.sql.SQLException; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static org.easymock.EasyMock.*; -import static org.junit.Assert.assertTrue; - -public class AsyncQueriesTest extends MockSupport { - - - private static ActorSystem actorSystem; - - @Before - public void setup() { - actorSystem = ActorSystem.create("TestingActorSystem"); - Logger.getRootLogger().setLevel(Level.DEBUG); - } - - @After - public void teardown() { - JavaTestKit.shutdownActorSystem(actorSystem); - } - - - /** - * Test the actor inactivity timer - * Send the actor a message and dont care about the result - * - * @throws SQLException - * @throws ConnectionException - * @throws InterruptedException - */ - @Test - @Ignore - public void testAsyncQuerySubmission() throws SQLException, ConnectionException, InterruptedException { - mockDependencies(); - setUpDefaultExpectations(); - String[] statements = {"select * from test"}; - AsyncJob job = new AsyncJob("10", "admin", statements, "tst.log", viewContext); - for (String s : statements) { - expect(((HiveStatement) statement).executeAsync(s)).andReturn(true); - } - - ActorRef operationControl = actorSystem.actorOf( - Props.create(OperationController.class, actorSystem, connectionSupplier, supplier, hdfsSupplier), "operationController-test"); - - Inbox inbox = Inbox.create(actorSystem); - - ExecuteJob executeJob = new ExecuteJob(connect, job); - inbox.send(operationControl, executeJob); - - replay(connection, resultSet, resultSetMetaData, statement, viewContext, connect, connectable, hdfsSupplier, hdfsApi, supplier, connectionSupplier); - - try { - - Object submitted = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); - - assertTrue(submitted instanceof JobSubmitted); - inbox.send(operationControl,new FetchResult("10","admin")); - - Either<ActorRef, AsyncExecutionFailed> receive = (Either<ActorRef, AsyncExecutionFailed>) inbox.receive(Duration.create(1, TimeUnit.MINUTES)); - - inbox.send(receive.getLeft(),new Next()); - - HiveResult result = (HiveResult)inbox.receive(Duration.create(1, TimeUnit.MINUTES)); - - List<HiveResult.Row> rows = result.getRows(); - System.out.println(rows); - - verify(connection, resultSet, resultSetMetaData, statement, viewContext, connect, connectable, hdfsSupplier, hdfsApi, supplier); - - - } catch (Throwable e) { - e.printStackTrace(); - } - - - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/test/java/org/apache/ambari/view/hive2/InactivityTest.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/test/java/org/apache/ambari/view/hive2/InactivityTest.java b/contrib/views/hive-next/src/test/java/org/apache/ambari/view/hive2/InactivityTest.java deleted file mode 100644 index 16405b2..0000000 --- a/contrib/views/hive-next/src/test/java/org/apache/ambari/view/hive2/InactivityTest.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive2; - - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Inbox; -import akka.actor.Props; -import akka.testkit.JavaTestKit; -import org.apache.ambari.view.hive2.actor.OperationController; -import org.apache.ambari.view.hive2.actor.message.AsyncJob; -import org.apache.ambari.view.hive2.actor.message.ExecuteJob; -import org.apache.ambari.view.hive2.actor.message.SyncJob; -import org.apache.ambari.view.hive2.internal.ConnectionException; -import org.apache.hive.jdbc.HiveStatement; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.easymock.EasyMock; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -import java.sql.SQLException; - -import static org.easymock.EasyMock.*; - -public class InactivityTest extends MockSupport { - - - private static ActorSystem actorSystem; - - @BeforeClass - public static void setup() { - actorSystem = ActorSystem.create("TestingActorSystem"); - Logger.getRootLogger().setLevel(Level.DEBUG); - } - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(actorSystem); - } - - - /** - * Test the actor inactivity timer - * Send the actor a message and dont care about the result - * - * @throws SQLException - * @throws ConnectionException - * @throws InterruptedException - */ - @Test - @Ignore - public void testActorInactivityTimer() throws SQLException, ConnectionException, InterruptedException { - mockDependencies(); - setUpDefaultExpectations(); - reset(resultSet); - reset(resultSetMetaData); - statement.close(); - resultSet.close(); - - - String[] statements = {"select * from test"}; - AsyncJob job = new AsyncJob("100","admin", statements,"tst.log" ,viewContext); - for (String s : statements) { - expect(((HiveStatement)statement).executeAsync(s)).andReturn(true); - } - - ActorRef operationControl = actorSystem.actorOf( - Props.create(OperationController.class, actorSystem, connectionSupplier, supplier, hdfsSupplier), "operationController-test"); - - Inbox inbox = Inbox.create(actorSystem); - - ExecuteJob executeJob = new ExecuteJob(connect, job); - inbox.send(operationControl, executeJob); - - replay(connection, resultSet, resultSetMetaData, statement, viewContext, connect, connectable, hdfsSupplier, hdfsApi, supplier, connectionSupplier); - - //allow inactivity timer to fire - Thread.sleep(62000); - - verify(connection, resultSet, resultSetMetaData, statement, viewContext, connect, connectable, hdfsSupplier, hdfsApi, supplier, connectionSupplier); - - - - - - } - - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/test/java/org/apache/ambari/view/hive2/Mocksupport.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/test/java/org/apache/ambari/view/hive2/Mocksupport.java b/contrib/views/hive-next/src/test/java/org/apache/ambari/view/hive2/Mocksupport.java deleted file mode 100644 index b7e6320..0000000 --- a/contrib/views/hive-next/src/test/java/org/apache/ambari/view/hive2/Mocksupport.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive2; - -import com.google.common.base.Optional; -import org.apache.ambari.view.ViewContext; -import org.apache.ambari.view.hive2.persistence.DataStoreStorage; -import org.apache.ambari.view.hive2.actor.message.Connect; -import org.apache.ambari.view.hive2.internal.Connectable; -import org.apache.ambari.view.hive2.internal.ConnectionException; -import org.apache.ambari.view.hive2.internal.ConnectionSupplier; -import org.apache.ambari.view.hive2.internal.DataStorageSupplier; -import org.apache.ambari.view.hive2.internal.HdfsApiSupplier; -import org.apache.ambari.view.utils.hdfs.HdfsApi; -import org.apache.hive.jdbc.HiveConnection; -import org.apache.hive.jdbc.HiveStatement; - -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Statement; - -import static org.easymock.EasyMock.createNiceMock; -import static org.easymock.EasyMock.expect; - -abstract class MockSupport { - - - protected HiveJdbcConnectionDelegate connectionDelegate; - protected HiveConnection connection; - protected Statement statement; - protected ResultSet resultSet; - protected DataStorageSupplier supplier; - protected HdfsApiSupplier hdfsSupplier; - protected ConnectionSupplier connectionSupplier; - protected HdfsApi hdfsApi; - protected ViewContext viewContext; - protected Connect connect; - protected ResultSetMetaData resultSetMetaData; - protected Connectable connectable; - - public void setUpDefaultExpectations() throws SQLException, ConnectionException { - expect(supplier.get(viewContext)).andReturn(new DataStoreStorage(viewContext)); - expect(hdfsSupplier.get(viewContext)).andReturn(Optional.fromNullable(hdfsApi)).anyTimes(); - expect(connection.createStatement()).andReturn(statement); - expect(connect.getConnectable()).andReturn(connectable); - expect(connectable.isOpen()).andReturn(false); - Optional<HiveConnection> connectionOptional = Optional.of(connection); - expect(connectable.getConnection()).andReturn(connectionOptional).anyTimes(); - expect(connectionSupplier.get(viewContext)).andReturn(connectionDelegate).times(1); - expect(statement.getResultSet()).andReturn(resultSet); - expect(resultSet.getMetaData()).andReturn(resultSetMetaData); - expect(resultSetMetaData.getColumnCount()).andReturn(1); - expect(resultSetMetaData.getColumnName(1)).andReturn("test"); - expect(resultSet.next()).andReturn(true); - expect(resultSet.getObject(1)).andReturn("test"); - - connectable.connect(); - } - - public void mockDependencies() { - connectionDelegate = new HiveJdbcConnectionDelegate(); - connection = createNiceMock(HiveConnection.class); - statement = createNiceMock(HiveStatement.class); - resultSet = createNiceMock(ResultSet.class); - supplier = createNiceMock(DataStorageSupplier.class); - hdfsSupplier = createNiceMock(HdfsApiSupplier.class); - connectionSupplier = createNiceMock(ConnectionSupplier.class); - hdfsApi = createNiceMock(HdfsApi.class); - viewContext = createNiceMock(ViewContext.class); - connect = createNiceMock(Connect.class); - resultSetMetaData = createNiceMock(ResultSetMetaData.class); - connectable = createNiceMock(Connectable.class); - } - - - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/test/java/org/apache/ambari/view/hive2/SyncQueriesTest.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/test/java/org/apache/ambari/view/hive2/SyncQueriesTest.java b/contrib/views/hive-next/src/test/java/org/apache/ambari/view/hive2/SyncQueriesTest.java deleted file mode 100644 index a656c4e..0000000 --- a/contrib/views/hive-next/src/test/java/org/apache/ambari/view/hive2/SyncQueriesTest.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive2; - - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Inbox; -import akka.actor.Props; -import akka.testkit.JavaTestKit; -import org.apache.ambari.view.hive2.client.Row; -import org.apache.ambari.view.hive2.actor.OperationController; -import org.apache.ambari.view.hive2.actor.message.ExecuteJob; -import org.apache.ambari.view.hive2.actor.message.SyncJob; -import org.apache.ambari.view.hive2.actor.message.job.ExecutionFailed; -import org.apache.ambari.view.hive2.actor.message.job.FetchFailed; -import org.apache.ambari.view.hive2.actor.message.job.Next; -import org.apache.ambari.view.hive2.actor.message.job.NoMoreItems; -import org.apache.ambari.view.hive2.actor.message.job.NoResult; -import org.apache.ambari.view.hive2.actor.message.job.Result; -import org.apache.ambari.view.hive2.actor.message.job.ResultSetHolder; -import org.apache.ambari.view.hive2.internal.ConnectionException; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import scala.concurrent.duration.Duration; - -import java.sql.SQLException; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static org.easymock.EasyMock.*; -import static org.junit.Assert.*; - -public class SyncQueriesTest extends MockSupport { - - - private ActorSystem actorSystem; - - @Before - public void setup() { - actorSystem = ActorSystem.create("TestingActorSystem"); - Logger.getRootLogger().setLevel(Level.DEBUG); - } - - @After - public void teardown() { - JavaTestKit.shutdownActorSystem(actorSystem); - } - - - - @Test - @Ignore - public void testSyncJobSubmission() throws SQLException, ConnectionException, InterruptedException { - mockDependencies(); - setUpDefaultExpectations(); - String[] statements = {"select * from test"}; - SyncJob job = new SyncJob("admin", statements,viewContext); - for (String s : statements) { - expect(statement.execute(s)).andReturn(true); - } - - ActorRef operationControl = actorSystem.actorOf( - Props.create(OperationController.class, actorSystem, connectionSupplier, supplier, hdfsSupplier), "operationController-test"); - - Inbox inbox = Inbox.create(actorSystem); - - ExecuteJob executeJob = new ExecuteJob(connect, job); - inbox.send(operationControl, executeJob); - - replay(connection, resultSet, resultSetMetaData, statement, viewContext, connect, connectable, hdfsSupplier, hdfsApi, supplier, connectionSupplier); - - try { - - Object jdbcResult = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); - - if (jdbcResult instanceof NoResult) { - fail(); - } else if (jdbcResult instanceof ExecutionFailed) { - - ExecutionFailed error = (ExecutionFailed) jdbcResult; - fail(); - error.getError().printStackTrace(); - - } else if (jdbcResult instanceof ResultSetHolder) { - ResultSetHolder holder = (ResultSetHolder) jdbcResult; - ActorRef iterator = holder.getIterator(); - - inbox.send(iterator, new Next()); - Object receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); - - - Result result = (Result) receive; - List<Row> rows = result.getRows(); - System.out.println("Fetched " + rows.size() + " entries."); - for (Row row : rows) { - assertArrayEquals(row.getRow(), new String[]{"test"}); - } - - inbox.send(iterator, new Next()); - receive = inbox.receive(Duration.create(1, TimeUnit.MINUTES)); - assertTrue(receive instanceof NoMoreItems); - - - if (receive instanceof FetchFailed) { - fail(); - } - - } - - } catch (Throwable ex) { - fail(); - } - - - verify(connection, resultSet, resultSetMetaData, statement, viewContext, connect, connectable, hdfsSupplier, hdfsApi, supplier); - - } - - -}
