http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseChangeNotifier.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseChangeNotifier.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseChangeNotifier.java new file mode 100644 index 0000000..37f24d2 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseChangeNotifier.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.actor; + +import akka.actor.ActorRef; +import akka.actor.PoisonPill; +import akka.actor.Props; +import com.google.common.collect.Sets; +import org.apache.ambari.view.hive20.actor.message.HiveMessage; +import org.apache.ambari.view.hive20.internal.dto.TableInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * + */ +public class DatabaseChangeNotifier extends HiveActor { + private final Logger LOG = LoggerFactory.getLogger(getClass()); + + private String currentDatabaseName; + private Map<String, TableWrapper> tables = new HashMap<>(); + private Map<String, TableInfo> newTables = new HashMap<>(); + + @Override + public void handleMessage(HiveMessage hiveMessage) { + Object message = hiveMessage.getMessage(); + if(message instanceof DatabaseAdded) { + handleDatabaseAdded((DatabaseAdded) message); + } else if ( message instanceof DatabaseRemoved) { + handleDatabaseRemoved((DatabaseRemoved) message); + } else if (message instanceof TableUpdated) { + handleTableUpdated((TableUpdated) message); + } else if (message instanceof AllTablesUpdated) { + handleAllTableUpdated((AllTablesUpdated) message); + } + } + + private void handleDatabaseAdded(DatabaseAdded message) { + LOG.info("Database Added: {}", message.name); + currentDatabaseName = message.name; + // TODO: Send event to eventbus + } + + private void handleDatabaseRemoved(DatabaseRemoved message) { + LOG.info("Database Removed: {}", message.name); + // TODO: Send event to eventbus + } + + private void handleTableUpdated(TableUpdated message) { + LOG.info("XXXXX: table xxxx. Size: {}", newTables.size()); + newTables.put(message.info.getName(), message.info); + } + + private void handleAllTableUpdated(AllTablesUpdated message) { + Set<String> oldTableNames = new HashSet<>(tables.keySet()); + Set<String> newTableNames = new HashSet<>(newTables.keySet()); + + Set<String> tablesAdded = Sets.difference(newTableNames, oldTableNames); + Set<String> tablesRemoved = Sets.difference(oldTableNames, newTableNames); + Set<String> tablesUpdated = Sets.intersection(oldTableNames, newTableNames); + + updateTablesAdded(tablesAdded); + updateTablesRemoved(tablesRemoved); + updateTablesUpdated(tablesUpdated); + newTables.clear(); + } + + private void updateTablesAdded(Set<String> tablesAdded) { + for (String tableName: tablesAdded) { + TableWrapper wrapper = new TableWrapper(tableName); + tables.put(tableName, wrapper); + wrapper.getTableNotifier().tell(new TableChangeNotifier.TableAdded(newTables.get(tableName)), getSelf()); + } + } + + private void updateTablesRemoved(Set<String> tablesRemoved) { + for(String tableName: tablesRemoved) { + TableWrapper tableWrapper = tables.remove(tableName); + tableWrapper.getTableNotifier().tell(new TableChangeNotifier.TableRemoved(tableName), getSelf()); + tableWrapper.getTableNotifier().tell(PoisonPill.getInstance(), getSelf()); + } + } + + private void updateTablesUpdated(Set<String> tablesUpdated) { + for(String tableName: tablesUpdated) { + TableWrapper tableWrapper = tables.get(tableName); + // TODO: Check what needs to be done here. + } + } + + public static Props props() { + return Props.create(DatabaseChangeNotifier.class); + } + + public class TableWrapper { + private final String tableName; + private final ActorRef tableNotifier; + + private TableWrapper(String tableName) { + this.tableName = tableName; + this.tableNotifier = getContext().actorOf(TableChangeNotifier.props()); + } + + public String getTableName() { + return tableName; + } + + public ActorRef getTableNotifier() { + return tableNotifier; + } + } + + public static class DatabaseAdded { + private final String name; + + public DatabaseAdded(String name) { + this.name = name; + } + } + + + public static class DatabaseRemoved { + private final String name; + + public DatabaseRemoved(String name) { + this.name = name; + } + } + + public static class TableUpdated { + private final TableInfo info; + + public TableUpdated(TableInfo info) { + this.info = info; + } + } + + public static class AllTablesUpdated { + private final String database; + + public AllTablesUpdated(String database) { + this.database = database; + } + } + + +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseManager.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseManager.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseManager.java new file mode 100644 index 0000000..6dc4ad9 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DatabaseManager.java @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.actor; + +import akka.actor.ActorRef; +import akka.actor.PoisonPill; +import akka.actor.Props; +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Sets; +import org.apache.ambari.view.ViewContext; +import org.apache.ambari.view.hive20.AuthParams; +import org.apache.ambari.view.hive20.ConnectionFactory; +import org.apache.ambari.view.hive20.actor.message.HiveMessage; +import org.apache.ambari.view.hive20.client.ConnectionConfig; +import org.apache.ambari.view.hive20.internal.Connectable; +import org.apache.ambari.view.hive20.internal.HiveConnectionWrapper; +import org.apache.ambari.view.hive20.internal.dto.DatabaseInfo; +import org.apache.ambari.view.hive20.internal.dto.TableInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.Duration; + +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Manages database related state, queries Hive to get the list of databases and then manages state for each database. + * Also, periodically updates the list of databases by calling hive. + */ +public class DatabaseManager extends HiveActor { + + private final Logger LOG = LoggerFactory.getLogger(getClass()); + + private final Connectable connectable; + + private final ActorRef metaDataRetriever; + private final String username; + + private boolean refreshInProgress = false; + private boolean selfRefreshQueued = false; + + private Map<String, DatabaseWrapper> databases = new HashMap<>(); + private Set<String> databasesToUpdate; + + + public DatabaseManager(String username, Connectable connectable) { + this.username = username; + this.connectable = connectable; + metaDataRetriever = getContext().actorOf(MetaDataRetriever.props(connectable)); + } + + @Override + public void handleMessage(HiveMessage hiveMessage) { + + Object message = hiveMessage.getMessage(); + if (message instanceof Refresh) { + handleRefresh(); + } else if (message instanceof SelfRefresh) { + handleSelfRefresh(); + } else if (message instanceof MetaDataRetriever.DBRefreshed) { + handleDBRefreshed((MetaDataRetriever.DBRefreshed) message); + } else if (message instanceof MetaDataRetriever.TableRefreshed) { + handleTableRefreshed((MetaDataRetriever.TableRefreshed) message); + } else if (message instanceof MetaDataRetriever.AllTableRefreshed) { + handleAllTableRefeshed((MetaDataRetriever.AllTableRefreshed) message); + } else if (message instanceof GetDatabases) { + handleGetDatabases((GetDatabases) message); + } + + } + + private void handleSelfRefresh() { + if (refreshInProgress) { + getContext().system().scheduler().scheduleOnce(Duration.create(500, TimeUnit.MILLISECONDS), + getSelf(), new SelfRefresh(), getContext().dispatcher(), getSelf()); + } else { + selfRefreshQueued = false; + refresh(); + } + } + + private void handleRefresh() { + if (refreshInProgress && selfRefreshQueued) { + return; // We will not honor refresh message when a refresh is going on and another self refresh is queued in mailbox + } else if (refreshInProgress) { + selfRefreshQueued = true; // If refresh is in progress, we will queue up only one refresh message. + getContext().system().scheduler().scheduleOnce(Duration.create(500, TimeUnit.MILLISECONDS), + getSelf(), new SelfRefresh(), getContext().dispatcher(), getSelf()); + } else { + refresh(); + } + } + + private void handleDBRefreshed(MetaDataRetriever.DBRefreshed message) { + Set<DatabaseInfo> databasesInfos = message.getDatabases(); + Set<String> currentDatabases = new HashSet<>(databases.keySet()); + Set<String> newDatabases = FluentIterable.from(databasesInfos).transform(new Function<DatabaseInfo, String>() { + @Nullable + @Override + public String apply(@Nullable DatabaseInfo databaseInfo) { + return databaseInfo.getName(); + } + }).toSet(); + + databasesToUpdate = new HashSet<>(newDatabases); + + Set<String> databasesAdded = Sets.difference(newDatabases, currentDatabases); + Set<String> databasesRemoved = Sets.difference(currentDatabases, newDatabases); + + updateDatabasesAdded(databasesAdded, databasesInfos); + updateDatabasesRemoved(databasesRemoved); + } + + private void updateDatabasesAdded(Set<String> databasesAdded, Set<DatabaseInfo> databasesInfos) { + for (DatabaseInfo info : databasesInfos) { + if (databasesAdded.contains(info.getName())) { + DatabaseWrapper wrapper = new DatabaseWrapper(info); + databases.put(info.getName(), wrapper); + wrapper.getDatabaseNotifier().tell(new DatabaseChangeNotifier.DatabaseAdded(info.getName()), getSelf()); + } + } + } + + private void updateDatabasesRemoved(Set<String> databasesRemoved) { + for (String database : databasesRemoved) { + DatabaseWrapper wrapper = databases.remove(database); + ActorRef notifier = wrapper.getDatabaseNotifier(); + notifier.tell(new DatabaseChangeNotifier.DatabaseRemoved(database), getSelf()); + notifier.tell(PoisonPill.getInstance(), getSelf()); + } + } + + private void handleTableRefreshed(MetaDataRetriever.TableRefreshed message) { + ActorRef databaseChangeNotifier = getDatabaseChangeNotifier(message.getDatabase()); + updateTable(message.getDatabase(), message.getTable()); + databaseChangeNotifier.tell(new DatabaseChangeNotifier.TableUpdated(message.getTable()), getSelf()); + } + + private void handleAllTableRefeshed(MetaDataRetriever.AllTableRefreshed message) { + ActorRef databaseChangeNotifier = getDatabaseChangeNotifier(message.getDatabase()); + databaseChangeNotifier.tell(new DatabaseChangeNotifier.AllTablesUpdated(message.getDatabase()), getSelf()); + if (checkIfAllTablesOfAllDatabaseRefeshed(message)) { + refreshInProgress = false; + } + } + + private void handleGetDatabases(GetDatabases message) { + if (refreshInProgress) { + // If currently refreshing, then schedule the same message after 500 milliseconds + getContext().system().scheduler().scheduleOnce(Duration.create(500, TimeUnit.MILLISECONDS), + getSelf(), message, getContext().dispatcher(), getSender()); + return; + } + Set<DatabaseInfo> infos = new HashSet<>(); + for (DatabaseWrapper wrapper : databases.values()) { + infos.add(wrapper.getDatabase()); + } + getSender().tell(new DatabasesResult(infos), getSelf()); + } + + private boolean checkIfAllTablesOfAllDatabaseRefeshed(MetaDataRetriever.AllTableRefreshed message) { + databasesToUpdate.remove(message.getDatabase()); + return databasesToUpdate.isEmpty(); + } + + private ActorRef getDatabaseChangeNotifier(String databaseName) { + DatabaseWrapper wrapper = databases.get(databaseName); + ActorRef databaseChangeNotifier = null; + if (wrapper != null) { + databaseChangeNotifier = wrapper.getDatabaseNotifier(); + } + return databaseChangeNotifier; + } + + private void refresh() { + LOG.info("Received refresh for user"); + refreshInProgress = true; + metaDataRetriever.tell(new MetaDataRetriever.RefreshDB(), getSelf()); + + scheduleRefreshAfter(1, TimeUnit.MINUTES); + } + + private void scheduleRefreshAfter(long time, TimeUnit timeUnit) { + getContext().system().scheduler().scheduleOnce(Duration.create(time, timeUnit), + getSelf(), new Refresh(username), getContext().dispatcher(), getSelf()); + } + + @Override + public void postStop() throws Exception { + LOG.info("Database Manager stopped!!!"); + connectable.disconnect(); + } + + private void updateTable(String databaseName, TableInfo table) { + DatabaseWrapper wrapper = databases.get(databaseName); + if (wrapper != null) { + DatabaseInfo info = wrapper.getDatabase(); + info.getTables().add(table); + } + } + + public static Props props(ViewContext context) { + ConnectionConfig config = ConnectionFactory.create(context); + Connectable connectable = new HiveConnectionWrapper(config.getJdbcUrl(), config.getUsername(), config.getPassword(), new AuthParams(context)); + return Props.create(DatabaseManager.class, config.getUsername(), connectable); + } + + public static class Refresh { + private final String username; + + public Refresh(String username) { + this.username = username; + } + + public String getUsername() { + return username; + } + } + + private static class SelfRefresh { + } + + private class DatabaseWrapper { + private final DatabaseInfo database; + private final ActorRef databaseNotifier; + + private DatabaseWrapper(DatabaseInfo database) { + this.database = database; + databaseNotifier = getContext().actorOf(DatabaseChangeNotifier.props()); + } + + public DatabaseInfo getDatabase() { + return database; + } + + public ActorRef getDatabaseNotifier() { + return databaseNotifier; + } + } + + public static class GetDatabases { + private final String username; + + public GetDatabases(String username) { + this.username = username; + } + + public String getUsername() { + return username; + } + } + + public static class DatabasesResult { + private final Set<DatabaseInfo> databases; + + public DatabasesResult(Set<DatabaseInfo> databases) { + this.databases = databases; + } + + public Set<DatabaseInfo> getDatabases() { + return databases; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DeathWatch.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DeathWatch.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DeathWatch.java new file mode 100644 index 0000000..58cefcd --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/DeathWatch.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.actor; + +import akka.actor.ActorRef; +import akka.actor.Terminated; +import org.apache.ambari.view.hive20.actor.message.HiveMessage; +import org.apache.ambari.view.hive20.actor.message.RegisterActor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; + +public class DeathWatch extends HiveActor { + + private final static Logger LOG = + LoggerFactory.getLogger(DeathWatch.class); + + @Override + public void handleMessage(HiveMessage hiveMessage) { + Object message = hiveMessage.getMessage(); + if(message instanceof RegisterActor){ + RegisterActor registerActor = (RegisterActor) message; + ActorRef actorRef = registerActor.getActorRef(); + this.getContext().watch(actorRef); + LOG.info("Registered new actor "+ actorRef); + LOG.info("Registration for {} at {}", actorRef,new Date()); + } + if(message instanceof Terminated){ + Terminated terminated = (Terminated) message; + ActorRef actor = terminated.actor(); + LOG.info("Received terminate for actor "+ actor); + LOG.info("Termination for {} at {}", actor,new Date()); + + } + + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/HiveActor.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/HiveActor.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/HiveActor.java new file mode 100644 index 0000000..384b798 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/HiveActor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.actor; + +import akka.actor.UntypedActor; +import org.apache.ambari.view.hive20.actor.message.HiveMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class HiveActor extends UntypedActor { + + private final Logger LOG = LoggerFactory.getLogger(getClass()); + + @Override + final public void onReceive(Object message) throws Exception { + HiveMessage hiveMessage = new HiveMessage(message); + if(LOG.isDebugEnabled()){ + LOG.debug("Received message: " + message.getClass().getName() + ", generated id: " + hiveMessage.getId() + + " sent by: " + sender() + ", recieved by" + self()); + } + + handleMessage(hiveMessage); + + if(LOG.isDebugEnabled()){ + LOG.debug("Message submitted: " + hiveMessage.getId()); + + } + } + + public abstract void handleMessage(HiveMessage hiveMessage); + + + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java new file mode 100644 index 0000000..ce58c8c --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/JdbcConnector.java @@ -0,0 +1,642 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.actor; + +import akka.actor.ActorRef; +import akka.actor.Cancellable; +import akka.actor.PoisonPill; +import akka.actor.Props; +import com.google.common.base.Optional; +import org.apache.ambari.view.ViewContext; +import org.apache.ambari.view.hive20.AuthParams; +import org.apache.ambari.view.hive20.ConnectionDelegate; +import org.apache.ambari.view.hive20.actor.message.Connect; +import org.apache.ambari.view.hive20.actor.message.FetchError; +import org.apache.ambari.view.hive20.actor.message.FetchResult; +import org.apache.ambari.view.hive20.actor.message.GetColumnMetadataJob; +import org.apache.ambari.view.hive20.actor.message.HiveJob; +import org.apache.ambari.view.hive20.actor.message.HiveMessage; +import org.apache.ambari.view.hive20.actor.message.ResultInformation; +import org.apache.ambari.view.hive20.actor.message.ResultNotReady; +import org.apache.ambari.view.hive20.actor.message.RunStatement; +import org.apache.ambari.view.hive20.actor.message.SQLStatementJob; +import org.apache.ambari.view.hive20.actor.message.job.CancelJob; +import org.apache.ambari.view.hive20.actor.message.job.ExecuteNextStatement; +import org.apache.ambari.view.hive20.actor.message.job.ExecutionFailed; +import org.apache.ambari.view.hive20.actor.message.job.Failure; +import org.apache.ambari.view.hive20.actor.message.job.NoResult; +import org.apache.ambari.view.hive20.actor.message.job.ResultSetHolder; +import org.apache.ambari.view.hive20.actor.message.job.SaveDagInformation; +import org.apache.ambari.view.hive20.actor.message.job.SaveGuidToDB; +import org.apache.ambari.view.hive20.actor.message.lifecycle.CleanUp; +import org.apache.ambari.view.hive20.actor.message.lifecycle.DestroyConnector; +import org.apache.ambari.view.hive20.actor.message.lifecycle.FreeConnector; +import org.apache.ambari.view.hive20.actor.message.lifecycle.InactivityCheck; +import org.apache.ambari.view.hive20.actor.message.lifecycle.KeepAlive; +import org.apache.ambari.view.hive20.actor.message.lifecycle.TerminateInactivityCheck; +import org.apache.ambari.view.hive20.internal.Connectable; +import org.apache.ambari.view.hive20.internal.ConnectionException; +import org.apache.ambari.view.hive20.persistence.Storage; +import org.apache.ambari.view.hive20.persistence.utils.ItemNotFound; +import org.apache.ambari.view.hive20.resources.jobs.viewJobs.Job; +import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobImpl; +import org.apache.ambari.view.hive20.utils.HiveActorConfiguration; +import org.apache.ambari.view.utils.hdfs.HdfsApi; +import org.apache.hive.jdbc.HiveConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.Duration; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + + +/** + * Wraps one Jdbc connection per user, per instance. This is used to delegate execute the statements and + * creates child actors to delegate the ResultSet extraction, YARN/ATS querying for ExecuteJob info and Log Aggregation + */ +public class JdbcConnector extends HiveActor { + + private final Logger LOG = LoggerFactory.getLogger(getClass()); + + public static final String SUFFIX = "validating the login"; + + /** + * Interval for maximum inactivity allowed + */ + private final static long MAX_INACTIVITY_INTERVAL = 5 * 60 * 1000; + + /** + * Interval for maximum inactivity allowed before termination + */ + private static final long MAX_TERMINATION_INACTIVITY_INTERVAL = 10 * 60 * 1000; + + private static final long MILLIS_IN_SECOND = 1000L; + + private final Storage storage; + + /** + * Keeps track of the timestamp when the last activity has happened. This is + * used to calculate the inactivity period and take lifecycle decisions based + * on it. + */ + private long lastActivityTimestamp; + + /** + * Akka scheduler to tick at an interval to deal with inactivity of this actor + */ + private Cancellable inactivityScheduler; + + /** + * Akka scheduler to tick at an interval to deal with the inactivity after which + * the actor should be killed and connection should be released + */ + private Cancellable terminateActorScheduler; + + private Connectable connectable = null; + private final ActorRef deathWatch; + private final ConnectionDelegate connectionDelegate; + private final ActorRef parent; + private ActorRef statementExecutor = null; + private final HdfsApi hdfsApi; + private final AuthParams authParams; + + /** + * true if the actor is currently executing any job. + */ + private boolean executing = false; + private HiveJob.Type executionType = HiveJob.Type.SYNC; + + /** + * Returns the timeout configurations. + */ + private final HiveActorConfiguration actorConfiguration; + private String username; + private Optional<String> jobId = Optional.absent(); + private Optional<String> logFile = Optional.absent(); + private int statementsCount = 0; + + private ActorRef commandSender = null; + + private ActorRef resultSetIterator = null; + private boolean isFailure = false; + private Failure failure = null; + private boolean isCancelCalled = false; + + /** + * For every execution, this will hold the statements that are left to execute + */ + private Queue<String> statementQueue = new ArrayDeque<>(); + + public JdbcConnector(ViewContext viewContext, ActorRef parent, ActorRef deathWatch, HdfsApi hdfsApi, + ConnectionDelegate connectionDelegate, Storage storage) { + this.hdfsApi = hdfsApi; + this.parent = parent; + this.deathWatch = deathWatch; + this.connectionDelegate = connectionDelegate; + this.storage = storage; + this.lastActivityTimestamp = System.currentTimeMillis(); + resultSetIterator = null; + + authParams = new AuthParams(viewContext); + actorConfiguration = new HiveActorConfiguration(viewContext); + } + + @Override + public void handleMessage(HiveMessage hiveMessage) { + Object message = hiveMessage.getMessage(); + if (message instanceof InactivityCheck) { + checkInactivity(); + } else if (message instanceof TerminateInactivityCheck) { + checkTerminationInactivity(); + } else if (message instanceof KeepAlive) { + keepAlive(); + } else if (message instanceof CleanUp) { + cleanUp(); + } else { + handleNonLifecycleMessage(hiveMessage); + } + } + + private void handleNonLifecycleMessage(HiveMessage hiveMessage) { + Object message = hiveMessage.getMessage(); + keepAlive(); + if (message instanceof Connect) { + connect((Connect) message); + } else if (message instanceof SQLStatementJob) { + runStatementJob((SQLStatementJob) message); + } else if (message instanceof GetColumnMetadataJob) { + runGetMetaData((GetColumnMetadataJob) message); + } else if (message instanceof ExecuteNextStatement) { + executeNextStatement(); + } else if (message instanceof ResultInformation) { + gotResultBack((ResultInformation) message); + } else if (message instanceof CancelJob) { + cancelJob((CancelJob) message); + } else if (message instanceof FetchResult) { + fetchResult((FetchResult) message); + } else if (message instanceof FetchError) { + fetchError((FetchError) message); + } else if (message instanceof SaveGuidToDB) { + saveGuid((SaveGuidToDB) message); + } else if (message instanceof SaveDagInformation) { + saveDagInformation((SaveDagInformation) message); + } else { + unhandled(message); + } + } + + private void fetchError(FetchError message) { + if (isFailure) { + sender().tell(Optional.of(failure), self()); + return; + } + sender().tell(Optional.absent(), self()); + } + + private void fetchResult(FetchResult message) { + if (isFailure) { + sender().tell(failure, self()); + return; + } + + if (executing) { + sender().tell(new ResultNotReady(jobId.get(), username), self()); + return; + } + sender().tell(Optional.fromNullable(resultSetIterator), self()); + } + + private void cancelJob(CancelJob message) { + if (!executing || connectionDelegate == null) { + LOG.error("Cannot cancel job for user as currently the job is not running or started. JobId: {}", message.getJobId()); + return; + } + LOG.info("Cancelling job for user. JobId: {}, user: {}", message.getJobId(), username); + try { + isCancelCalled = true; + connectionDelegate.cancel(); + } catch (SQLException e) { + LOG.error("Failed to cancel job. JobId: {}. {}", message.getJobId(), e); + } + } + + private void gotResultBack(ResultInformation message) { + Optional<Failure> failureOptional = message.getFailure(); + if (failureOptional.isPresent()) { + Failure failure = failureOptional.get(); + processFailure(failure); + return; + } + if (statementQueue.size() == 0) { + // This is the last resultSet + processResult(message.getResultSet()); + } else { + self().tell(new ExecuteNextStatement(), self()); + } + } + + private void processCancel() { + executing = false; + if (isAsync() && jobId.isPresent()) { + LOG.error("Job canceled by user for JobId: {}", jobId.get()); + updateJobStatus(jobId.get(), Job.JOB_STATE_CANCELED); + } + } + + private void processFailure(Failure failure) { + executing = false; + isFailure = true; + this.failure = failure; + if (isAsync() && jobId.isPresent()) { + if(isCancelCalled) { + processCancel(); + return; + } + updateJobStatus(jobId.get(), Job.JOB_STATE_ERROR); + } else { + // Send for sync execution + commandSender.tell(new ExecutionFailed(failure.getMessage(), failure.getError()), self()); + cleanUpWithTermination(); + } + } + + private void processResult(Optional<ResultSet> resultSetOptional) { + executing = false; + + LOG.info("Finished processing SQL statements for Job id : {}", jobId.or("SYNC JOB")); + if (isAsync() && jobId.isPresent()) { + updateJobStatus(jobId.get(), Job.JOB_STATE_FINISHED); + } + + if (resultSetOptional.isPresent()) { + ActorRef resultSetActor = getContext().actorOf(Props.create(ResultSetIterator.class, self(), + resultSetOptional.get(), isAsync()).withDispatcher("akka.actor.result-dispatcher"), + "ResultSetIterator:" + UUID.randomUUID().toString()); + resultSetIterator = resultSetActor; + if (!isAsync()) { + commandSender.tell(new ResultSetHolder(resultSetActor), self()); + } + } else { + resultSetIterator = null; + if (!isAsync()) { + commandSender.tell(new NoResult(), self()); + } + } + } + + private void executeNextStatement() { + if (statementQueue.isEmpty()) { + jobExecutionCompleted(); + return; + } + + int index = statementsCount - statementQueue.size(); + String statement = statementQueue.poll(); + if (statementExecutor == null) { + statementExecutor = getStatementExecutor(); + } + + if (isAsync()) { + statementExecutor.tell(new RunStatement(index, statement, jobId.get(), true, logFile.get(), true), self()); + } else { + statementExecutor.tell(new RunStatement(index, statement), self()); + } + } + + private void runStatementJob(SQLStatementJob message) { + executing = true; + jobId = message.getJobId(); + logFile = message.getLogFile(); + executionType = message.getType(); + commandSender = getSender(); + + resetToInitialState(); + + if (!checkConnection()) return; + + for (String statement : message.getStatements()) { + statementQueue.add(statement); + } + statementsCount = statementQueue.size(); + + if (isAsync() && jobId.isPresent()) { + updateJobStatus(jobId.get(), Job.JOB_STATE_RUNNING); + startInactivityScheduler(); + } + self().tell(new ExecuteNextStatement(), self()); + } + + public boolean checkConnection() { + if (connectable == null) { + notifyConnectFailure(new SQLException("Hive connection is not created")); + return false; + } + + Optional<HiveConnection> connectionOptional = connectable.getConnection(); + if (!connectionOptional.isPresent()) { + SQLException sqlException = connectable.isUnauthorized() ? new SQLException("Hive Connection not Authorized", "AUTHFAIL") + : new SQLException("Hive connection is not created"); + notifyConnectFailure(sqlException); + return false; + } + return true; + } + + private void runGetMetaData(GetColumnMetadataJob message) { + if (!checkConnection()) return; + resetToInitialState(); + executing = true; + executionType = message.getType(); + commandSender = getSender(); + statementExecutor = getStatementExecutor(); + statementExecutor.tell(message, self()); + } + + private ActorRef getStatementExecutor() { + return getContext().actorOf(Props.create(StatementExecutor.class, hdfsApi, storage, connectable.getConnection().get(), connectionDelegate) + .withDispatcher("akka.actor.result-dispatcher"), + "StatementExecutor:" + UUID.randomUUID().toString()); + } + + private boolean isAsync() { + return executionType == HiveJob.Type.ASYNC; + } + + private void notifyConnectFailure(Exception ex) { + executing = false; + isFailure = true; + this.failure = new Failure("Cannot connect to hive", ex); + if (isAsync()) { + updateJobStatus(jobId.get(), Job.JOB_STATE_ERROR); + + if(ex instanceof ConnectionException){ + ConnectionException connectionException = (ConnectionException) ex; + Throwable cause = connectionException.getCause(); + if(cause instanceof SQLException){ + SQLException sqlException = (SQLException) cause; + if(isLoginError(sqlException)) + return; + } + } + + } else { + sender().tell(new ExecutionFailed("Cannot connect to hive"), ActorRef.noSender()); + } + // Do not clean up in case of failed authorizations + // The failure is bubbled to the user for requesting credentials + + if (!(ex instanceof SQLException) || !((SQLException) ex).getSQLState().equals("AUTHFAIL")) { + cleanUpWithTermination(); + } + } + + private boolean isLoginError(SQLException ce) { + return ce.getCause().getMessage().toLowerCase().endsWith(SUFFIX); + } + + private void keepAlive() { + lastActivityTimestamp = System.currentTimeMillis(); + } + + private void jobExecutionCompleted() { + // Set is executing as false so that the inactivity checks can finish cleanup + // after timeout + LOG.info("Job execution completed for user: {}. Results are ready to be fetched", username); + this.executing = false; + } + + protected Optional<String> getUsername() { + return Optional.fromNullable(username); + } + + private void connect(Connect message) { + username = message.getUsername(); + jobId = message.getJobId(); + executionType = message.getType(); + // check the connectable + if (connectable == null) { + connectable = message.getConnectable(authParams); + } + // make the connectable to Hive + try { + if (!connectable.isOpen()) { + connectable.connect(); + } + } catch (ConnectionException e) { + LOG.error("Failed to create a hive connection. {}", e); + // set up job failure + // notify parent about job failure + notifyConnectFailure(e); + return; + } + startTerminateInactivityScheduler(); + } + + private void updateJobStatus(String jobid, final String status) { + new JobSaver(jobid) { + @Override + protected void update(JobImpl job) { + job.setStatus(status); + job.setDuration(getUpdatedDuration(job.getDateSubmitted())); + } + }.save(); + LOG.info("Stored job status for Job id: {} as '{}'", jobid, status); + } + + private void saveGuid(final SaveGuidToDB message) { + new JobSaver(message.getJobId()) { + @Override + protected void update(JobImpl job) { + job.setGuid(message.getGuid()); + } + }.save(); + LOG.info("Stored GUID for Job id: {} as '{}'", message.getJobId(), message.getGuid()); + } + + private void saveDagInformation(final SaveDagInformation message) { + if(message.getDagId() == null && + message.getDagName() == null && + message.getApplicationId() == null) { + LOG.error("Cannot save Dag Information for job Id: {} as all the properties are null.", message.getJobId()); + return; + } + new JobSaver(message.getJobId()) { + + @Override + protected void update(JobImpl job) { + if (message.getApplicationId() != null) { + job.setApplicationId(message.getApplicationId()); + } + if (message.getDagId() != null) { + job.setDagId(message.getDagId()); + } + if(message.getDagName() != null) { + job.setDagName(message.getDagName()); + } + } + }.save(); + LOG.info("Store Dag Information for job. Job id: {}, dagName: {}, dagId: {}, applicationId: {}", message.getJobId(), message.getDagName(), message.getDagId(), message.getApplicationId()); + } + + private Long getUpdatedDuration(Long dateSubmitted) { + return (System.currentTimeMillis() / MILLIS_IN_SECOND) - (dateSubmitted / MILLIS_IN_SECOND); + } + + + private void checkInactivity() { + LOG.debug("Inactivity check, executing status: {}", executing); + if (executing) { + keepAlive(); + return; + } + long current = System.currentTimeMillis(); + if ((current - lastActivityTimestamp) > actorConfiguration.getInactivityTimeout(MAX_INACTIVITY_INTERVAL)) { + // Stop all the sub-actors created + cleanUp(); + } + } + + private void checkTerminationInactivity() { + if (!isAsync()) { + // Should not terminate if job is sync. Will terminate after the job is finished. + stopTerminateInactivityScheduler(); + return; + } + + LOG.debug("Termination check, executing status: {}", executing); + if (executing) { + keepAlive(); + return; + } + + long current = System.currentTimeMillis(); + if ((current - lastActivityTimestamp) > actorConfiguration.getTerminationTimeout(MAX_TERMINATION_INACTIVITY_INTERVAL)) { + cleanUpWithTermination(); + } + } + + private void cleanUp() { + if (jobId.isPresent()) { + LOG.debug("{} :: Cleaning up resources for inactivity for jobId: {}", self().path().name(), jobId.get()); + } else { + LOG.debug("{} ::Cleaning up resources with inactivity for Sync execution.", self().path().name()); + } + this.executing = false; + cleanUpStatementAndResultSet(); + stopInactivityScheduler(); + parent.tell(new FreeConnector(username, jobId.orNull(), isAsync()), self()); + } + + private void cleanUpWithTermination() { + this.executing = false; + LOG.debug("{} :: Cleaning up resources with inactivity for execution.", self().path().name()); + cleanUpStatementAndResultSet(); + + stopInactivityScheduler(); + stopTerminateInactivityScheduler(); + parent.tell(new DestroyConnector(username, jobId.orNull(), isAsync()), this.self()); + self().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + + + private void cleanUpStatementAndResultSet() { + connectionDelegate.closeStatement(); + connectionDelegate.closeResultSet(); + } + + private void startTerminateInactivityScheduler() { + this.terminateActorScheduler = getContext().system().scheduler().schedule( + Duration.Zero(), Duration.create(60 * 1000, TimeUnit.MILLISECONDS), + this.getSelf(), new TerminateInactivityCheck(), getContext().dispatcher(), null); + } + + private void stopTerminateInactivityScheduler() { + if (!(terminateActorScheduler == null || terminateActorScheduler.isCancelled())) { + terminateActorScheduler.cancel(); + } + } + + private void startInactivityScheduler() { + if (inactivityScheduler != null) { + inactivityScheduler.cancel(); + } + inactivityScheduler = getContext().system().scheduler().schedule( + Duration.Zero(), Duration.create(15 * 1000, TimeUnit.MILLISECONDS), + this.self(), new InactivityCheck(), getContext().dispatcher(), null); + } + + private void stopInactivityScheduler() { + if (!(inactivityScheduler == null || inactivityScheduler.isCancelled())) { + inactivityScheduler.cancel(); + } + } + + private void resetToInitialState() { + isFailure = false; + failure = null; + resultSetIterator = null; + isCancelCalled = false; + statementQueue = new ArrayDeque<>(); + } + + @Override + public void postStop() throws Exception { + stopInactivityScheduler(); + stopTerminateInactivityScheduler(); + + if (connectable.isOpen()) { + connectable.disconnect(); + } + } + + /** + * Saves the job to database. + */ + private abstract class JobSaver { + private final String jobId; + + JobSaver(String jobId) { + this.jobId = jobId; + } + + public void save() { + try { + JobImpl job = storage.load(JobImpl.class, jobId); + update(job); + storage.store(JobImpl.class, job); + } catch (ItemNotFound itemNotFound) { + itemNotFound(jobId); + } + } + + /** + * Override to handle Not found exception + */ + private void itemNotFound(String jobId) { + // Nothing to do + } + + protected abstract void update(JobImpl job); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java new file mode 100644 index 0000000..f9c21b4 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.actor; + +import akka.actor.ActorRef; +import akka.actor.Cancellable; +import com.google.common.base.Joiner; +import org.apache.ambari.view.hive20.actor.message.GetMoreLogs; +import org.apache.ambari.view.hive20.actor.message.HiveMessage; +import org.apache.ambari.view.hive20.actor.message.LogAggregationFinished; +import org.apache.ambari.view.hive20.actor.message.StartLogAggregation; +import org.apache.ambari.view.utils.hdfs.HdfsApi; +import org.apache.ambari.view.utils.hdfs.HdfsApiException; +import org.apache.ambari.view.utils.hdfs.HdfsUtil; +import org.apache.hive.jdbc.HiveStatement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.Duration; + +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Reads the logs for a ExecuteJob from the Statement and writes them into hdfs. + */ +public class LogAggregator extends HiveActor { + + private final Logger LOG = LoggerFactory.getLogger(getClass()); + + public static final int AGGREGATION_INTERVAL = 5 * 1000; + private final HdfsApi hdfsApi; + private final HiveStatement statement; + private final String logFile; + + private Cancellable moreLogsScheduler; + private ActorRef parent; + private boolean hasStartedFetching = false; + private boolean shouldFetchMore = true; + + public LogAggregator(HdfsApi hdfsApi, HiveStatement statement, String logFile) { + this.hdfsApi = hdfsApi; + this.statement = statement; + this.logFile = logFile; + } + + @Override + public void handleMessage(HiveMessage hiveMessage) { + Object message = hiveMessage.getMessage(); + if (message instanceof StartLogAggregation) { + start(); + } + + if (message instanceof GetMoreLogs) { + try { + getMoreLogs(); + } catch (SQLException e) { + LOG.error("SQL Error while getting logs. Tried writing to: {}", logFile); + } catch (HdfsApiException e) { + LOG.warn("HDFS Error while getting writing logs to {}", logFile); + + } + } + } + + private void start() { + parent = this.getSender(); + hasStartedFetching = false; + shouldFetchMore = true; + if (!(moreLogsScheduler == null || moreLogsScheduler.isCancelled())) { + moreLogsScheduler.cancel(); + } + this.moreLogsScheduler = getContext().system().scheduler().schedule( + Duration.Zero(), Duration.create(AGGREGATION_INTERVAL, TimeUnit.MILLISECONDS), + this.getSelf(), new GetMoreLogs(), getContext().dispatcher(), null); + } + + private void getMoreLogs() throws SQLException, HdfsApiException { + List<String> logs = statement.getQueryLog(); + if (logs.size() > 0 && shouldFetchMore) { + String allLogs = Joiner.on("\n").skipNulls().join(logs); + HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs); + if(!statement.hasMoreLogs()) { + shouldFetchMore = false; + } + } else { + // Cancel the timer only when log fetching has been started + if(!shouldFetchMore) { + moreLogsScheduler.cancel(); + parent.tell(new LogAggregationFinished(), ActorRef.noSender()); + } + } + } + + @Override + public void postStop() throws Exception { + if (moreLogsScheduler != null && !moreLogsScheduler.isCancelled()) { + moreLogsScheduler.cancel(); + } + + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataManager.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataManager.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataManager.java new file mode 100644 index 0000000..d63b3a0 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataManager.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.actor; + +import akka.actor.ActorRef; +import akka.actor.Cancellable; +import akka.actor.Props; +import org.apache.ambari.view.ViewContext; +import org.apache.ambari.view.hive20.actor.message.HiveMessage; +import org.apache.ambari.view.hive20.actor.message.Ping; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.Duration; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Manages the Meta Information for Hive Server. Singleton actor which stores several DatabaseManagerActor in memory for + * each user and instance name combination. + */ +public class MetaDataManager extends HiveActor { + + private final Logger LOG = LoggerFactory.getLogger(getClass()); + + /** + * Stores the sub database manager actors per user combination + */ + private final Map<String, ActorRef> databaseManagers = new HashMap<>(); + private final Map<String, Cancellable> terminationSchedulers = new HashMap<>(); + private final ViewContext context; + + public MetaDataManager(ViewContext context) { + this.context = context; + } + + @Override + public void handleMessage(HiveMessage hiveMessage) { + + Object message = hiveMessage.getMessage(); + if (message instanceof Ping) { + handlePing((Ping) message); + } else if (message instanceof Terminate) { + handleTerminate((Terminate) message); + } else if (message instanceof DatabaseManager.GetDatabases) { + handleGetDatabases((DatabaseManager.GetDatabases) message); + } + } + + private void handlePing(Ping message) { + LOG.info("Ping message received for user: {}, instance: {}", message.getUsername(), message.getInstanceName()); + ActorRef databaseManager = databaseManagers.get(message.getUsername()); + if (databaseManager == null) { + databaseManager = createDatabaseManager(message.getUsername(), message.getInstanceName()); + databaseManagers.put(context.getUsername(), databaseManager); + databaseManager.tell(new DatabaseManager.Refresh(context.getUsername()), getSelf()); + } else { + cancelTerminationScheduler(message.getUsername()); + } + scheduleTermination(context.getUsername()); + } + + private void handleTerminate(Terminate message) { + ActorRef databaseManager = databaseManagers.remove(message.username); + getContext().stop(databaseManager); + cancelTerminationScheduler(message.getUsername()); + } + + private void handleGetDatabases(DatabaseManager.GetDatabases message) { + String username = message.getUsername(); + ActorRef databaseManager = databaseManagers.get(username); + if(databaseManager != null) { + databaseManager.tell(message, getSender()); + } else { + // Not database Manager created. Start the database manager with a ping message + // and queue up the GetDatabases call to self + getSelf().tell(new Ping(username, context.getInstanceName()), getSender()); + getSelf().tell(message, getSender()); + } + } + + private void cancelTerminationScheduler(String username) { + Cancellable cancellable = terminationSchedulers.remove(username); + if (!(cancellable == null || cancellable.isCancelled())) { + LOG.info("Cancelling termination scheduler"); + cancellable.cancel(); + } + } + + private void scheduleTermination(String username) { + Cancellable cancellable = context().system().scheduler().scheduleOnce(Duration.create(2, TimeUnit.MINUTES), + getSelf(), new Terminate(username), getContext().dispatcher(), getSelf()); + terminationSchedulers.put(username, cancellable); + } + + private ActorRef createDatabaseManager(String username, String instanceName) { + LOG.info("Creating database manager for username: {}, instance: {}", username, instanceName); + return context().actorOf(DatabaseManager.props(context)); + } + + public static Props props(ViewContext viewContext) { + return Props.create(MetaDataManager.class, viewContext); + } + + private class Terminate { + public final String username; + + public Terminate(String username) { + this.username = username; + } + + public String getUsername() { + return username; + } + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataRetriever.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataRetriever.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataRetriever.java new file mode 100644 index 0000000..7323a0a --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/MetaDataRetriever.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.actor; + +import akka.actor.Props; +import com.google.common.base.Optional; +import org.apache.ambari.view.hive20.actor.message.HiveMessage; +import org.apache.ambari.view.hive20.internal.Connectable; +import org.apache.ambari.view.hive20.internal.ConnectionException; +import org.apache.ambari.view.hive20.internal.dto.DatabaseInfo; +import org.apache.ambari.view.hive20.internal.dto.TableInfo; +import org.apache.hive.jdbc.HiveConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashSet; +import java.util.Set; + +/** + * + */ +public class MetaDataRetriever extends HiveActor { + + private final Logger LOG = LoggerFactory.getLogger(getClass()); + + private final Connectable connectable; + + public MetaDataRetriever(Connectable connectable) { + this.connectable = connectable; + } + + @Override + public void handleMessage(HiveMessage hiveMessage) { + Object message = hiveMessage.getMessage(); + if (message instanceof RefreshDB) { + handleRefreshDB(); + } + } + + private void handleRefreshDB() { + try { + refreshDatabaseInfos(); + } catch (ConnectionException | SQLException e) { + LOG.error("Failed to update the complete database information. Exception: {}", e); + getSender().tell(new DBRefreshFailed(e), getSelf()); + } + } + + private HiveConnection getHiveConnection() throws ConnectionException { + if (!connectable.isOpen()) { + connectable.connect(); + } + Optional<HiveConnection> connectionOptional = connectable.getConnection(); + return connectionOptional.get(); + } + + private void refreshDatabaseInfos() throws ConnectionException, SQLException { + HiveConnection connection = getHiveConnection(); + Set<DatabaseInfo> infos = new HashSet<>(); + try (ResultSet schemas = connection.getMetaData().getSchemas()) { + while (schemas.next()) { + DatabaseInfo info = new DatabaseInfo(schemas.getString(1)); + infos.add(info); + } + } + + getSender().tell(new DBRefreshed(infos), getSelf()); + + for (DatabaseInfo info : infos) { + refreshTablesInfo(info.getName()); + } + } + + private void refreshTablesInfo(String database) throws ConnectionException, SQLException { + HiveConnection connection = getHiveConnection(); + try (ResultSet tables = connection.getMetaData().getTables("", database, null, null)) { + while (tables.next()) { + TableInfo info = new TableInfo(tables.getString(3), tables.getString(4)); + getSender().tell(new TableRefreshed(info, database), getSelf()); + } + } + getSender().tell(new AllTableRefreshed(database), getSelf()); + } + + public static Props props(Connectable connectable) { + return Props.create(MetaDataRetriever.class, connectable); + } + + + public static class RefreshDB { + + } + + public static class DBRefreshed { + private final Set<DatabaseInfo> databases; + + public DBRefreshed(Set<DatabaseInfo> databases) { + this.databases = databases; + } + + public Set<DatabaseInfo> getDatabases() { + return databases; + } + } + + public static class DBRefreshFailed { + private final Exception exception; + + public DBRefreshFailed(Exception exception) { + this.exception = exception; + } + + public Exception getException() { + return exception; + } + } + + public static class TableRefreshed { + private final TableInfo table; + private final String database; + + public TableRefreshed(TableInfo table, String database) { + this.table = table; + this.database = database; + } + + public TableInfo getTable() { + return table; + } + + public String getDatabase() { + return database; + } + } + + public static class AllTableRefreshed { + private final String database; + + public AllTableRefreshed(String database) { + this.database = database; + } + + public String getDatabase() { + return database; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/OperationController.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/OperationController.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/OperationController.java new file mode 100644 index 0000000..f751d8f --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/OperationController.java @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.actor; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import com.google.common.base.Optional; +import org.apache.ambari.view.ViewContext; +import org.apache.ambari.view.hive20.ConnectionDelegate; +import org.apache.ambari.view.hive20.actor.message.Connect; +import org.apache.ambari.view.hive20.actor.message.ExecuteJob; +import org.apache.ambari.view.hive20.actor.message.FetchError; +import org.apache.ambari.view.hive20.actor.message.FetchResult; +import org.apache.ambari.view.hive20.actor.message.HiveJob; +import org.apache.ambari.view.hive20.actor.message.HiveMessage; +import org.apache.ambari.view.hive20.actor.message.JobRejected; +import org.apache.ambari.view.hive20.actor.message.RegisterActor; +import org.apache.ambari.view.hive20.actor.message.SQLStatementJob; +import org.apache.ambari.view.hive20.actor.message.job.CancelJob; +import org.apache.ambari.view.hive20.actor.message.job.FetchFailed; +import org.apache.ambari.view.hive20.actor.message.job.SaveDagInformation; +import org.apache.ambari.view.hive20.actor.message.lifecycle.DestroyConnector; +import org.apache.ambari.view.hive20.actor.message.lifecycle.FreeConnector; +import org.apache.ambari.view.hive20.internal.ContextSupplier; +import org.apache.ambari.view.hive20.persistence.Storage; +import org.apache.ambari.view.hive20.utils.LoggingOutputStream; +import org.apache.ambari.view.utils.hdfs.HdfsApi; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.collections4.map.HashedMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.UUID; + +/** + * Router actor to control the operations. This delegates the operations to underlying child actors and + * store the state for them. + */ +public class OperationController extends HiveActor { + + private final Logger LOG = LoggerFactory.getLogger(getClass()); + + private final ActorSystem system; + private final ActorRef deathWatch; + private final ContextSupplier<ConnectionDelegate> connectionSupplier; + private final ContextSupplier<Storage> storageSupplier; + private final ContextSupplier<Optional<HdfsApi>> hdfsApiSupplier; + + /** + * Store the connection per user which are currently not working + */ + private final Map<String, Queue<ActorRef>> asyncAvailableConnections; + + /** + * Store the connection per user which are currently not working + */ + private final Map<String, Queue<ActorRef>> syncAvailableConnections; + + + /** + * Store the connection per user/per job which are currently working. + */ + private final Map<String, Map<String, ActorRef>> asyncBusyConnections; + + /** + * Store the connection per user which will be used to execute sync jobs + * like fetching databases, tables etc. + */ + private final Map<String, Set<ActorRef>> syncBusyConnections; + + + private final ViewContext context; + + public OperationController(ActorSystem system, + ActorRef deathWatch, + ViewContext context, + ContextSupplier<ConnectionDelegate> connectionSupplier, + ContextSupplier<Storage> storageSupplier, + ContextSupplier<Optional<HdfsApi>> hdfsApiSupplier) { + this.system = system; + this.deathWatch = deathWatch; + this.context = context; + this.connectionSupplier = connectionSupplier; + this.storageSupplier = storageSupplier; + this.hdfsApiSupplier = hdfsApiSupplier; + this.asyncAvailableConnections = new HashMap<>(); + this.syncAvailableConnections = new HashMap<>(); + this.asyncBusyConnections = new HashedMap<>(); + this.syncBusyConnections = new HashMap<>(); + } + + @Override + public void handleMessage(HiveMessage hiveMessage) { + Object message = hiveMessage.getMessage(); + + if (message instanceof ExecuteJob) { + ExecuteJob job = (ExecuteJob) message; + if (job.getJob().getType() == HiveJob.Type.ASYNC) { + sendJob(job.getConnect(), (SQLStatementJob) job.getJob()); + } else if (job.getJob().getType() == HiveJob.Type.SYNC) { + sendSyncJob(job.getConnect(), job.getJob()); + } + } + + if (message instanceof CancelJob) { + cancelJob((CancelJob) message); + } + + if (message instanceof FetchResult) { + fetchResultActorRef((FetchResult) message); + } + + if (message instanceof FetchError) { + fetchError((FetchError) message); + } + + if (message instanceof FreeConnector) { + freeConnector((FreeConnector) message); + } + + if (message instanceof DestroyConnector) { + destroyConnector((DestroyConnector) message); + } + + if (message instanceof SaveDagInformation) { + saveDagInformation((SaveDagInformation) message); + } + } + + private void cancelJob(CancelJob message) { + String jobId = message.getJobId(); + String username = message.getUsername(); + ActorRef actorRef = asyncBusyConnections.get(username).get(jobId); + if (actorRef != null) { + actorRef.tell(message, sender()); + } else { + String msg = String.format("Cannot cancel job. Job with id: %s for instance: %s has either not started or has expired.", message.getJobId(), context.getInstanceName()); + LOG.error(msg); + sender().tell(new FetchFailed(msg), self()); + } + } + + private void saveDagInformation(SaveDagInformation message) { + ActorRef jdbcConnection = asyncBusyConnections.get(context.getUsername()).get(message.getJobId()); + if(jdbcConnection != null) { + jdbcConnection.tell(message, sender()); + } else { + String msg = String.format("Cannot update Dag Information for job. Job with id: %s for instance: %s has either not started or has expired.", message.getJobId(), context.getInstanceName()); + LOG.error(msg); + } + } + + private void fetchError(FetchError message) { + String jobId = message.getJobId(); + String username = message.getUsername(); + ActorRef actorRef = asyncBusyConnections.get(username).get(jobId); + if (actorRef != null) { + actorRef.tell(message, sender()); + } else { + String msg = String.format("Cannot fetch error for job. Job with id: %s for instance: %s has either not started or has expired.", message.getJobId(), context.getInstanceName()); + LOG.error(msg); + sender().tell(new FetchFailed(msg), self()); + } + } + + private void fetchResultActorRef(FetchResult message) { + String username = message.getUsername(); + String jobId = message.getJobId(); + ActorRef actorRef = asyncBusyConnections.get(username).get(jobId); + if (actorRef != null) { + actorRef.tell(message, sender()); + } else { + String msg = String.format("Cannot fetch result for job. Job with id: %s for instance: %s has either not started or has expired.", message.getJobId(), context.getInstanceName()); + LOG.error(msg); + sender().tell(new FetchFailed(msg), self()); + } + } + + private void sendJob(Connect connect, SQLStatementJob job) { + String username = job.getUsername(); + String jobId = job.getJobId().get(); + ActorRef subActor = null; + // Check if there is available actors to process this + subActor = getActorRefFromAsyncPool(username); + if (subActor == null) { + Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(context); + if (!hdfsApiOptional.isPresent()) { + sender().tell(new JobRejected(username, jobId, "Failed to connect to Hive."), self()); + return; + } + HdfsApi hdfsApi = hdfsApiOptional.get(); + + subActor = system.actorOf( + Props.create(JdbcConnector.class, context, self(), + deathWatch, hdfsApi, connectionSupplier.get(context), + storageSupplier.get(context)).withDispatcher("akka.actor.jdbc-connector-dispatcher"), + UUID.randomUUID().toString() + ":asyncjdbcConnector"); + deathWatch.tell(new RegisterActor(subActor), self()); + } + + if (asyncBusyConnections.containsKey(username)) { + Map<String, ActorRef> actors = asyncBusyConnections.get(username); + if (!actors.containsKey(jobId)) { + actors.put(jobId, subActor); + } else { + // Reject this as with the same jobId one connection is already in progress. + sender().tell(new JobRejected(username, jobId, "Existing job in progress with same jobId."), ActorRef.noSender()); + } + } else { + Map<String, ActorRef> actors = new HashMap<>(); + actors.put(jobId, subActor); + asyncBusyConnections.put(username, actors); + } + + // set up the connect with ExecuteJob id for terminations + subActor.tell(connect, self()); + subActor.tell(job, self()); + + } + + private ActorRef getActorRefFromSyncPool(String username) { + return getActorRefFromPool(syncAvailableConnections, username); + } + + private ActorRef getActorRefFromAsyncPool(String username) { + return getActorRefFromPool(asyncAvailableConnections, username); + } + + private ActorRef getActorRefFromPool(Map<String, Queue<ActorRef>> pool, String username) { + ActorRef subActor = null; + if (pool.containsKey(username)) { + Queue<ActorRef> availableActors = pool.get(username); + if (availableActors.size() != 0) { + subActor = availableActors.poll(); + } + } else { + pool.put(username, new LinkedList<ActorRef>()); + } + return subActor; + } + + private void sendSyncJob(Connect connect, HiveJob job) { + String username = job.getUsername(); + ActorRef subActor = null; + // Check if there is available actors to process this + subActor = getActorRefFromSyncPool(username); + + if (subActor == null) { + Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(context); + if (!hdfsApiOptional.isPresent()) { + sender().tell(new JobRejected(username, ExecuteJob.SYNC_JOB_MARKER, "Failed to connect to HDFS."), ActorRef.noSender()); + return; + } + HdfsApi hdfsApi = hdfsApiOptional.get(); + + subActor = system.actorOf( + Props.create(JdbcConnector.class, context, self(), + deathWatch, hdfsApi, connectionSupplier.get(context), + storageSupplier.get(context)).withDispatcher("akka.actor.jdbc-connector-dispatcher"), + UUID.randomUUID().toString() + ":syncjdbcConnector"); + deathWatch.tell(new RegisterActor(subActor), self()); + } + + if (syncBusyConnections.containsKey(username)) { + Set<ActorRef> actors = syncBusyConnections.get(username); + actors.add(subActor); + } else { + LinkedHashSet<ActorRef> actors = new LinkedHashSet<>(); + actors.add(subActor); + syncBusyConnections.put(username, actors); + } + + // Termination requires that the ref is known in case of sync jobs + subActor.tell(connect, sender()); + subActor.tell(job, sender()); + } + + + private void destroyConnector(DestroyConnector message) { + ActorRef sender = getSender(); + if (message.isForAsync()) { + removeFromAsyncBusyPool(message.getUsername(), message.getJobId()); + removeFromASyncAvailable(message.getUsername(), sender); + } else { + removeFromSyncBusyPool(message.getUsername(), sender); + removeFromSyncAvailable(message.getUsername(), sender); + } + logMaps(); + } + + private void freeConnector(FreeConnector message) { + ActorRef sender = getSender(); + if (message.isForAsync()) { + LOG.info("About to free connector for job {} and user {}", message.getJobId(), message.getUsername()); + Optional<ActorRef> refOptional = removeFromAsyncBusyPool(message.getUsername(), message.getJobId()); + if (refOptional.isPresent()) { + addToAsyncAvailable(message.getUsername(), refOptional.get()); + } + return; + } + + // Was a sync job, remove from sync pool + LOG.info("About to free sync connector for user {}", message.getUsername()); + Optional<ActorRef> refOptional = removeFromSyncBusyPool(message.getUsername(), sender); + if (refOptional.isPresent()) { + addToSyncAvailable(message.getUsername(), refOptional.get()); + } + + + logMaps(); + + } + + private void logMaps() { + LOG.debug("Pool status"); + LoggingOutputStream out = new LoggingOutputStream(LOG, LoggingOutputStream.LogLevel.DEBUG); + MapUtils.debugPrint(new PrintStream(out), "Busy Async connections", asyncBusyConnections); + MapUtils.debugPrint(new PrintStream(out), "Available Async connections", asyncAvailableConnections); + MapUtils.debugPrint(new PrintStream(out), "Busy Sync connections", syncBusyConnections); + MapUtils.debugPrint(new PrintStream(out), "Available Sync connections", syncAvailableConnections); + try { + out.close(); + } catch (IOException e) { + LOG.warn("Cannot close Logging output stream, this may lead to leaks"); + } + } + + private Optional<ActorRef> removeFromSyncBusyPool(String userName, ActorRef refToFree) { + if (syncBusyConnections.containsKey(userName)) { + Set<ActorRef> actorRefs = syncBusyConnections.get(userName); + actorRefs.remove(refToFree); + } + return Optional.of(refToFree); + } + + private Optional<ActorRef> removeFromAsyncBusyPool(String username, String jobId) { + ActorRef ref = null; + if (asyncBusyConnections.containsKey(username)) { + Map<String, ActorRef> actors = asyncBusyConnections.get(username); + if (actors.containsKey(jobId)) { + ref = actors.get(jobId); + actors.remove(jobId); + } + } + return Optional.fromNullable(ref); + } + + private void addToAsyncAvailable(String username, ActorRef actor) { + addToAvailable(asyncAvailableConnections, username, actor); + } + + private void addToSyncAvailable(String username, ActorRef actor) { + addToAvailable(syncAvailableConnections, username, actor); + } + + private void addToAvailable(Map<String, Queue<ActorRef>> pool, String username, ActorRef actor) { + if (!pool.containsKey(username)) { + pool.put(username, new LinkedList<ActorRef>()); + } + + Queue<ActorRef> availableActors = pool.get(username); + availableActors.add(actor); + } + + private void removeFromASyncAvailable(String username, ActorRef sender) { + removeFromAvailable(asyncAvailableConnections, username, sender); + } + + private void removeFromSyncAvailable(String username, ActorRef sender) { + removeFromAvailable(syncAvailableConnections, username, sender); + } + + private void removeFromAvailable(Map<String, Queue<ActorRef>> pool, String username, ActorRef sender) { + if (!pool.containsKey(username)) { + return; + } + Queue<ActorRef> actors = pool.get(username); + actors.remove(sender); + } + +} + + http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/ResultSetIterator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/ResultSetIterator.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/ResultSetIterator.java new file mode 100644 index 0000000..4b4a407 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/ResultSetIterator.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.actor; + +import akka.actor.ActorRef; +import com.google.common.collect.Lists; +import org.apache.ambari.view.hive20.actor.message.CursorReset; +import org.apache.ambari.view.hive20.actor.message.HiveMessage; +import org.apache.ambari.view.hive20.actor.message.ResetCursor; +import org.apache.ambari.view.hive20.actor.message.job.FetchFailed; +import org.apache.ambari.view.hive20.actor.message.job.Next; +import org.apache.ambari.view.hive20.actor.message.job.NoMoreItems; +import org.apache.ambari.view.hive20.actor.message.job.Result; +import org.apache.ambari.view.hive20.actor.message.lifecycle.CleanUp; +import org.apache.ambari.view.hive20.actor.message.lifecycle.KeepAlive; +import org.apache.ambari.view.hive20.client.ColumnDescription; +import org.apache.ambari.view.hive20.client.ColumnDescriptionShort; +import org.apache.ambari.view.hive20.client.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.List; + +public class ResultSetIterator extends HiveActor { + private final Logger LOG = LoggerFactory.getLogger(getClass()); + + private static final int DEFAULT_BATCH_SIZE = 100; + public static final String NULL = "NULL"; + + private final ActorRef parent; + private final ResultSet resultSet; + private final int batchSize; + + private List<ColumnDescription> columnDescriptions; + private int columnCount; + boolean async = false; + private boolean metaDataFetched = false; + + public ResultSetIterator(ActorRef parent, ResultSet resultSet, int batchSize, boolean isAsync) { + this.parent = parent; + this.resultSet = resultSet; + this.batchSize = batchSize; + this.async = isAsync; + } + + public ResultSetIterator(ActorRef parent, ResultSet resultSet) { + this(parent, resultSet, DEFAULT_BATCH_SIZE, true); + } + + public ResultSetIterator(ActorRef parent, ResultSet resultSet, boolean isAsync) { + this(parent, resultSet, DEFAULT_BATCH_SIZE, isAsync); + } + + @Override + public void handleMessage(HiveMessage hiveMessage) { + sendKeepAlive(); + Object message = hiveMessage.getMessage(); + if (message instanceof Next) { + getNext(); + } + if (message instanceof ResetCursor) { + resetResultSet(); + } + + if (message instanceof KeepAlive) { + sendKeepAlive(); + } + } + + private void resetResultSet() { + try { + resultSet.beforeFirst(); + sender().tell(new CursorReset(), self()); + } catch (SQLException e) { + LOG.error("Failed to reset the cursor", e); + sender().tell(new FetchFailed("Failed to reset the cursor", e), self()); + cleanUpResources(); + } + } + + private void sendKeepAlive() { + LOG.debug("Sending a keep alive to {}", parent); + parent.tell(new KeepAlive(), self()); + } + + private void getNext() { + List<Row> rows = Lists.newArrayList(); + if (!metaDataFetched) { + try { + initialize(); + } catch (SQLException ex) { + LOG.error("Failed to fetch metadata for the ResultSet", ex); + sender().tell(new FetchFailed("Failed to get metadata for ResultSet", ex), self()); + cleanUpResources(); + } + } + int index = 0; + try { + while (resultSet.next() && index < batchSize) { + index++; + rows.add(getRowFromResultSet(resultSet)); + } + + if (index == 0) { + // We have hit end of resultSet + sender().tell(new NoMoreItems(), self()); + if(!async) { + cleanUpResources(); + } + } else { + Result result = new Result(rows, columnDescriptions); + sender().tell(result, self()); + } + + } catch (SQLException ex) { + LOG.error("Failed to fetch next batch for the Resultset", ex); + sender().tell(new FetchFailed("Failed to fetch next batch for the Resultset", ex), self()); + cleanUpResources(); + } + } + + private void cleanUpResources() { + parent.tell(new CleanUp(), self()); + } + + private Row getRowFromResultSet(ResultSet resultSet) throws SQLException { + Object[] values = new Object[columnCount]; + for (int i = 0; i < columnCount; i++) { + values[i] = resultSet.getObject(i + 1); + } + return new Row(values); + } + + private void initialize() throws SQLException { + metaDataFetched = true; + ResultSetMetaData metaData = resultSet.getMetaData(); + columnCount = metaData.getColumnCount(); + columnDescriptions = Lists.newArrayList(); + for (int i = 1; i <= columnCount; i++) { + String columnName = metaData.getColumnName(i); + String typeName = metaData.getColumnTypeName(i); + ColumnDescription description = new ColumnDescriptionShort(columnName, typeName, i); + columnDescriptions.add(description); + } + } +}
