http://git-wip-us.apache.org/repos/asf/ambari/blob/af9654ff/contrib/views/hueambarimigration/src/main/java/org/apache/ambari/view/huetoambarimigration/service/configurationcheck/ConfFileReader.java ---------------------------------------------------------------------- diff --git a/contrib/views/hueambarimigration/src/main/java/org/apache/ambari/view/huetoambarimigration/service/configurationcheck/ConfFileReader.java b/contrib/views/hueambarimigration/src/main/java/org/apache/ambari/view/huetoambarimigration/service/configurationcheck/ConfFileReader.java new file mode 100644 index 0000000..ac76e1c --- /dev/null +++ b/contrib/views/hueambarimigration/src/main/java/org/apache/ambari/view/huetoambarimigration/service/configurationcheck/ConfFileReader.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.huetoambarimigration.service.configurationcheck; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.net.*; +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Properties; +import javax.ws.rs.core.Context; + +import org.apache.ambari.view.ViewContext; +import org.apache.ambari.view.AmbariStreamProvider; +import org.apache.ambari.view.URLStreamProvider; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DistributedFileSystem; + +import org.apache.ambari.view.huetoambarimigration.datasource.DataSourceAmbariDatabase; +import org.apache.ambari.view.huetoambarimigration.datasource.DataSourceHueDatabase; + +import org.apache.ambari.view.huetoambarimigration.model.*; +import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; +import org.apache.log4j.Logger; + +public class ConfFileReader { + + static final Logger logger = Logger.getLogger(ConfFileReader.class); + + private static String homeDir = System.getProperty("java.io.tmpdir")+"/"; + + public static boolean checkConfigurationForHue(String hueURL) { + + URL url = null; + int resonseCode = 0; + try { + url = new URL(hueURL); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); //OR huc.setRequestMethod ("HEAD"); + connection.connect(); + resonseCode = connection.getResponseCode(); + + + } catch (MalformedURLException e) { + + logger.error("Error in accessing the URL:" , e); + + } catch (ProtocolException e) { + + logger.error("Error in protocol: ", e); + } catch (IOException e) { + + logger.error("IO Exception while establishing connection:",e); + } + + return resonseCode == 200 ; + } + + public static boolean checkConfigurationForAmbari(String ambariURL) { + + + URL url = null; + int responseCode = 0; + try { + url = new URL(ambariURL); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); //OR huc.setRequestMethod ("HEAD"); + connection.connect(); + responseCode = connection.getResponseCode(); + + } catch (MalformedURLException e) { + logger.error("Error in accessing the URL: " , e); + + } catch (ProtocolException e) { + logger.error("Error in protocol: ", e); + } catch (IOException e) { + logger.error("IO Exception while establishing connection: ",e); + } + return responseCode == 200 ; + + + } + + public static boolean checkHueDatabaseConnection(String hueDBDRiver, String hueJdbcUrl, String huedbUsername, String huedbPassword) throws IOException { + + try { + Connection con = DataSourceHueDatabase.getInstance(hueDBDRiver, hueJdbcUrl, huedbUsername, huedbPassword).getConnection(); + } + catch (Exception e) { + + logger.error("Sql exception in acessing Hue Database: " ,e); + return false; + } + + return true; + + } + + public static boolean checkAmbariDatbaseConection(String ambariDBDriver, String ambariDBJdbcUrl, String ambariDbUsername, String ambariDbPassword) throws IOException { + + + try { + + Connection con = DataSourceAmbariDatabase.getInstance(ambariDBDriver, ambariDBJdbcUrl, ambariDbUsername, ambariDbPassword).getConnection(); + + + } catch (Exception e) { + + logger.error("Sql exception in acessing Ambari Database: " ,e); + + return false; + } + + return true; + + } + + public static String getHomeDir() { + return homeDir; + } + + public static void setHomeDir(String homeDir) { + ConfFileReader.homeDir = homeDir; + } + + public static boolean checkNamenodeURIConnectionforambari(String ambariServerNameNode) throws IOException, URISyntaxException { + + + Configuration conf = new Configuration(); + conf.set("fs.hdfs.impl", + org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + conf.set("fs.file.impl", + org.apache.hadoop.fs.LocalFileSystem.class.getName() + ); + + FileSystem fileSystem = FileSystem.get(new URI(ambariServerNameNode), conf); + + + if (fileSystem instanceof WebHdfsFileSystem) { + + return true; + + } else { + + return false; + } + + + } + + public static boolean checkNamenodeURIConnectionforHue(String hueServerNamenodeURI) throws IOException, URISyntaxException { + + Configuration conf = new Configuration(); + conf.set("fs.hdfs.impl", + org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() + ); + conf.set("fs.file.impl", + org.apache.hadoop.fs.LocalFileSystem.class.getName() + ); + + FileSystem fileSystem = FileSystem.get(new URI(hueServerNamenodeURI), conf); + + + if (fileSystem instanceof WebHdfsFileSystem) { + + return true; + } else { + + return false; + } + + + } + + +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/af9654ff/contrib/views/hueambarimigration/src/main/java/org/apache/ambari/view/huetoambarimigration/service/hive/HiveHistoryQueryImpl.java ---------------------------------------------------------------------- diff --git a/contrib/views/hueambarimigration/src/main/java/org/apache/ambari/view/huetoambarimigration/service/hive/HiveHistoryQueryImpl.java b/contrib/views/hueambarimigration/src/main/java/org/apache/ambari/view/huetoambarimigration/service/hive/HiveHistoryQueryImpl.java new file mode 100644 index 0000000..c959e8a --- /dev/null +++ b/contrib/views/hueambarimigration/src/main/java/org/apache/ambari/view/huetoambarimigration/service/hive/HiveHistoryQueryImpl.java @@ -0,0 +1,562 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.huetoambarimigration.service.hive; + +import java.net.URISyntaxException; +import java.security.PrivilegedExceptionAction; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.io.BufferedInputStream; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.util.Calendar; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.Scanner; + +import org.apache.ambari.view.huetoambarimigration.service.configurationcheck.ConfFileReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.log4j.Logger; +import org.jdom.Attribute; +import org.jdom.Document; +import org.jdom.Element; +import org.jdom.JDOMException; +import org.jdom.input.SAXBuilder; +import org.jdom.output.Format; +import org.jdom.output.XMLOutputter; + +public class HiveHistoryQueryImpl { + + static final Logger logger = Logger.getLogger(HiveHistoryQueryImpl.class); + + public void wrtitetoalternatesqlfile(String dirname, String content, String instance, int i) throws IOException { + + Date dNow = new Date(); + SimpleDateFormat ft = new SimpleDateFormat("YYYY-MM-dd hh:mm:ss"); + String currentDate = ft.format(dNow); + + XMLOutputter xmlOutput = new XMLOutputter(); + xmlOutput.setFormat(Format.getPrettyFormat()); + + File xmlfile = new File("/var/lib/huetoambari/RevertChange.xml"); + + if (xmlfile.exists()) { + String iteration = Integer.toString(i + 1); + SAXBuilder builder = new SAXBuilder(); + Document doc; + try { + doc = (Document) builder.build(xmlfile); + Element rootNode = doc.getRootElement(); + Element record = new Element("RevertRecord"); + record.setAttribute(new Attribute("id", iteration)); + record.addContent(new Element("datetime").setText(currentDate.toString())); + record.addContent(new Element("dirname").setText(dirname)); + record.addContent(new Element("instance").setText(instance)); + record.addContent(new Element("query").setText(content)); + rootNode.addContent(record); + xmlOutput.output(doc, new FileWriter(ConfFileReader.getHomeDir() + "RevertChange.xml")); + + } catch (JDOMException e) { + logger.error("JDOMException" ,e); + + } + + } else { + + try { + String iteration = Integer.toString(i + 1); + Element revertrecord = new Element("RevertChangePage"); + Document doc = new Document(revertrecord); + doc.setRootElement(revertrecord); + + Element record = new Element("RevertRecord"); + record.setAttribute(new Attribute("id", iteration)); + record.addContent(new Element("datetime").setText(currentDate.toString())); + record.addContent(new Element("dirname").setText(dirname)); + record.addContent(new Element("instance").setText(instance)); + record.addContent(new Element("query").setText(content)); + doc.getRootElement().addContent(record); + xmlOutput.output(doc, new FileWriter(ConfFileReader.getHomeDir() + "RevertChange.xml")); + } catch (IOException io) { + logger.error("JDOMException" , io); + } + + } + + } + + public int fetchMaximumIdfromAmbaridb(String driverName, Connection c, int id) throws SQLException { + + String ds_id = null; + Statement stmt = null; + stmt = c.createStatement(); + ResultSet rs = null; + + if (driverName.contains("postgresql")) { + rs = stmt.executeQuery("select MAX(cast(ds_id as integer)) as max from ds_jobimpl_" + id + ";"); + } else if (driverName.contains("mysql")) { + rs = stmt.executeQuery("select max( cast(ds_id as unsigned) ) as max from DS_JOBIMPL_" + id + ";"); + } else if (driverName.contains("oracle")) { + rs = stmt.executeQuery("select MAX(cast(ds_id as integer)) as max from ds_jobimpl_" + id); + } + + while (rs.next()) { + ds_id = rs.getString("max"); + } + + int num; + if (ds_id == null) { + num = 1; + } else { + num = Integer.parseInt(ds_id); + } + return num; + } + + public void insertRowinAmbaridb(String driverName, String dirname, int maxcount, long epochtime, Connection c, int id, String instance, int i) throws SQLException, IOException { + + String maxcount1 = Integer.toString(maxcount); + String epochtime1 = Long.toString(epochtime); + String ds_id = new String(); + Statement stmt = null; + String sql = ""; + String revsql = ""; + stmt = c.createStatement(); + + if (driverName.contains("mysql")) { + sql = "INSERT INTO DS_JOBIMPL_" + id + " values ('" + maxcount1 + + "','','','','','default'," + epochtime1 + ",0,'','','" + + dirname + "logs','admin','" + dirname + + "query.hql','','job','','','Unknown','" + dirname + + "','','Worksheet');"; + revsql = "delete from DS_JOBIMPL_" + id + " where ds_id='" + maxcount1 + "';"; + + } else if (driverName.contains("postgresql")) { + sql = "INSERT INTO ds_jobimpl_" + id + " values ('" + maxcount1 + + "','','','','','default'," + epochtime1 + ",0,'','','" + + dirname + "logs','admin','" + dirname + + "query.hql','','job','','','Unknown','" + dirname + + "','','Worksheet');"; + revsql = "delete from ds_jobimpl_" + id + " where ds_id='" + maxcount1 + "';"; + + } else if (driverName.contains("oracle")) { + sql = "INSERT INTO ds_jobimpl_" + id + " values ('" + maxcount1 + + "','','','','','default'," + epochtime1 + ",0,'','','" + + dirname + "logs','admin','" + dirname + + "query.hql','','job','','','Unknown','" + dirname + + "','','Worksheet')"; + revsql = "delete from ds_jobimpl_" + id + " where ds_id='" + maxcount1 + "'"; + + } + wrtitetoalternatesqlfile(dirname, revsql, instance, i); + + stmt.executeUpdate(sql); + + } + + public int fetchInstanceTablename(String driverName, Connection c, String instance) throws SQLException { + + String ds_id = new String(); + int id = 0; + Statement stmt = null; + stmt = c.createStatement(); + ResultSet rs = null; + + if (driverName.contains("oracle")) { + rs = stmt.executeQuery("select id from viewentity where class_name LIKE 'org.apache.ambari.view.hive.resources.jobs.viewJobs.JobImpl' and view_instance_name='" + instance + "'"); + } else { + rs = stmt.executeQuery("select id from viewentity where class_name LIKE 'org.apache.ambari.view.hive.resources.jobs.viewJobs.JobImpl' and view_instance_name='" + instance + "';"); + } + + while (rs.next()) { + id = rs.getInt("id"); + } + return id; + } + + public long getEpochTime() throws ParseException { + long seconds = System.currentTimeMillis() / 1000l; + return seconds; + + } + + public String getTime() throws ParseException { + int day, month, year; + int second, minute, hour; + int milisecond; + GregorianCalendar date = new GregorianCalendar(); + + day = date.get(Calendar.DAY_OF_MONTH); + month = date.get(Calendar.MONTH); + year = date.get(Calendar.YEAR); + + second = date.get(Calendar.SECOND); + minute = date.get(Calendar.MINUTE); + hour = date.get(Calendar.HOUR); + milisecond = date.get(Calendar.MILLISECOND); + + String s = year + "-" + (month + 1) + "-" + day + "_" + hour + "-" + + minute; + String s1 = year + "-" + (month + 1) + "-" + day + "_" + hour + "-" + + minute + "-" + second + "-" + milisecond; + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS"); + Date date1 = df.parse(s1); + long epoch = date1.getTime(); + return s; + + } + + public String[] fetchFromHue(String username, String startdate, String endtime, Connection connection) throws ClassNotFoundException, SQLException { + int id = 0; + int i = 0; + String[] query = new String[100]; + + try { + connection.setAutoCommit(false); + Statement statement = connection.createStatement(); + + ResultSet rs1 = null; + if (username.equals("all")) { + } else { + ResultSet rs = statement.executeQuery("select id from auth_user where username='" + username + "';"); + while (rs.next()) { + id = rs.getInt("id"); + } + } + if (startdate.equals("") && endtime.equals("")) { + if (username.equals("all")) { + rs1 = statement.executeQuery("select query from beeswax_queryhistory;"); + } else { + rs1 = statement.executeQuery("select query from beeswax_queryhistory where owner_id =" + id + ";"); + } + + } else if (!(startdate.equals("")) && !(endtime.equals(""))) { + if (username.equals("all")) { + rs1 = statement.executeQuery("select query from beeswax_queryhistory where submission_date >= date('" + startdate + "') AND submission_date < date('" + endtime + "');"); + } else { + rs1 = statement.executeQuery("select query from beeswax_queryhistory where owner_id =" + id + " AND submission_date >= date('" + startdate + "') AND submission_date <= date('" + endtime + "');"); + } + } else if (!(startdate.equals("")) && (endtime.equals(""))) { + if (username.equals("all")) { + rs1 = statement.executeQuery("select query from beeswax_queryhistory where submission_date >= date('" + startdate + "');"); + } else { + rs1 = statement.executeQuery("select query from beeswax_queryhistory where owner_id =" + id + " AND submission_date >= date('" + startdate + "');"); + } + + } else if ((startdate.equals("")) && !(endtime.equals(""))) { + if (username.equals("all")) { + rs1 = statement.executeQuery("select query from beeswax_queryhistory where submission_date < date('" + endtime + "');"); + } else { + rs1 = statement.executeQuery("select query from beeswax_queryhistory where owner_id =" + id + " AND submission_date < date('" + endtime + "');"); + } + } + + + while (rs1.next()) { + query[i] = rs1.getString("query"); + i++; + } + + connection.commit(); + + } catch (SQLException e) { + connection.rollback(); + + } finally { + try { + if (connection != null) + connection.close(); + } catch (SQLException e) { + logger.error("Sql exception error: " + e); + } + } + return query; + + } + + public void writetoFileQueryhql(String content, String homedir) { + try { + File file = new File(homedir + "query.hql"); + // if file doesnt exists, then create it + if (!file.exists()) { + file.createNewFile(); + } + FileWriter fw = new FileWriter(file.getAbsoluteFile()); + BufferedWriter bw = new BufferedWriter(fw); + bw.write(content); + bw.close(); + } catch (IOException e) { + logger.error("IOException" , e); + } + + } + + public void deleteFileQueryhql(String homedir) { + try{ + File file = new File(homedir + "query.hql"); + + if(file.delete()){ + logger.info("temporary hql file deleted"); + }else{ + logger.info("temporary hql file delete failed"); + } + + }catch(Exception e){ + + logger.error("File Exception ",e); + + } + + } + + public void deleteFileQueryLogs(String homedir) { + try{ + File file = new File(homedir + "logs"); + + if(file.delete()){ + logger.info("temporary logs file deleted"); + }else{ + logger.info("temporary logs file delete failed"); + } + + }catch(Exception e){ + + logger.error("File Exception ",e); + + } + + } + + public void writetoFileLogs(String homedir) { + try { + String content = ""; + File file = new File(homedir + "logs"); + // if file doesnt exists, then create it + if (!file.exists()) { + file.createNewFile(); + } + FileWriter fw = new FileWriter(file.getAbsoluteFile()); + BufferedWriter bw = new BufferedWriter(fw); + bw.write(content); + bw.close(); + } catch (IOException e) { + logger.error("IOException" , e); + } + + } + + public void createDir(final String dir, final String namenodeuri) throws IOException, + URISyntaxException { + + try { + final Configuration conf = new Configuration(); + + conf.set("fs.hdfs.impl", + org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() + ); + conf.set("fs.file.impl", + org.apache.hadoop.fs.LocalFileSystem.class.getName() + ); + conf.set("fs.defaultFS", namenodeuri); + conf.set("hadoop.job.ugi", "hdfs"); + UserGroupInformation.setConfiguration(conf); + + UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hdfs"); + + ugi.doAs(new PrivilegedExceptionAction<Boolean>() { + + public Boolean run() throws Exception { + + FileSystem fs = FileSystem.get(conf); + Path src = new Path(dir); + Boolean b = fs.mkdirs(src); + return b; + } + }); + } catch (Exception e) { + logger.error("Exception in Webhdfs" , e); + } + } + + public void createDirKerberorisedSecured(final String dir, final String namenodeuri) throws IOException, + URISyntaxException { + + try { + final Configuration conf = new Configuration(); + + conf.set("fs.hdfs.impl", + org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() + ); + conf.set("fs.file.impl", + org.apache.hadoop.fs.LocalFileSystem.class.getName() + ); + conf.set("fs.defaultFS", namenodeuri); + conf.set("hadoop.job.ugi", "hdfs"); + conf.set("hadoop.security.authentication", "Kerberos"); + UserGroupInformation.setConfiguration(conf); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hdfs"); + ugi.doAs(new PrivilegedExceptionAction<Boolean>() { + + public Boolean run() throws Exception { + FileSystem fs = FileSystem.get(conf); + Path src = new Path(dir); + Boolean b = fs.mkdirs(src); + return b; + } + }); + } catch (Exception e) { + logger.error("Exception in Webhdfs" , e); + } + } + + + public void putFileinHdfs(final String source, final String dest, final String namenodeuri) + throws IOException { + + try { + UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hdfs"); + + ugi.doAs(new PrivilegedExceptionAction<Void>() { + + public Void run() throws Exception { + + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", namenodeuri); + conf.set("hadoop.job.ugi", "hdfs"); + conf.set("fs.hdfs.impl", + org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() + ); + conf.set("fs.file.impl", + org.apache.hadoop.fs.LocalFileSystem.class.getName() + ); + FileSystem fileSystem = FileSystem.get(conf); + + String filename = source.substring( + source.lastIndexOf('/') + 1, source.length()); + String dest1; + if (dest.charAt(dest.length() - 1) != '/') { + dest1 = dest + "/" + filename; + } else { + dest1 = dest + filename; + } + + Path path = new Path(dest1); + if (fileSystem.exists(path)) { + + } + // Path pathsource = new Path(source); + FSDataOutputStream out = fileSystem.create(path); + + InputStream in = new BufferedInputStream( + new FileInputStream(new File(source))); + + byte[] b = new byte[1024]; + int numBytes = 0; + while ((numBytes = in.read(b)) > 0) { + out.write(b, 0, numBytes); + } + in.close(); + out.close(); + fileSystem.close(); + return null; + } + }); + } catch (Exception e) { + logger.error("Webhdfs exception" , e); + } + + } + + public void putFileinHdfsKerborizedSecured(final String source, final String dest, final String namenodeuri) + throws IOException { + + try { + + final Configuration conf = new Configuration(); + + conf.set("fs.hdfs.impl", + org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() + ); + conf.set("fs.file.impl", + org.apache.hadoop.fs.LocalFileSystem.class.getName() + ); + conf.set("fs.defaultFS", namenodeuri); + conf.set("hadoop.job.ugi", "hdfs"); + conf.set("hadoop.security.authentication", "Kerberos"); + UserGroupInformation.setConfiguration(conf); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hdfs"); + + ugi.doAs(new PrivilegedExceptionAction<Void>() { + + public Void run() throws Exception { + + FileSystem fileSystem = FileSystem.get(conf); + + String filename = source.substring( + source.lastIndexOf('/') + 1, source.length()); + String dest1; + if (dest.charAt(dest.length() - 1) != '/') { + dest1 = dest + "/" + filename; + } else { + dest1 = dest + filename; + } + + Path path = new Path(dest1); + if (fileSystem.exists(path)) { + + } + + FSDataOutputStream out = fileSystem.create(path); + + InputStream in = new BufferedInputStream( + new FileInputStream(new File(source))); + + byte[] b = new byte[1024]; + int numBytes = 0; + while ((numBytes = in.read(b)) > 0) { + out.write(b, 0, numBytes); + } + in.close(); + out.close(); + fileSystem.close(); + return null; + } + }); + } catch (Exception e) { + logger.error("Webhdfs exception" , e); + + } + + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/af9654ff/contrib/views/hueambarimigration/src/main/java/org/apache/ambari/view/huetoambarimigration/service/hive/HiveSavedQueryImpl.java ---------------------------------------------------------------------- diff --git a/contrib/views/hueambarimigration/src/main/java/org/apache/ambari/view/huetoambarimigration/service/hive/HiveSavedQueryImpl.java b/contrib/views/hueambarimigration/src/main/java/org/apache/ambari/view/huetoambarimigration/service/hive/HiveSavedQueryImpl.java new file mode 100644 index 0000000..3ad481d --- /dev/null +++ b/contrib/views/hueambarimigration/src/main/java/org/apache/ambari/view/huetoambarimigration/service/hive/HiveSavedQueryImpl.java @@ -0,0 +1,778 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.huetoambarimigration.service.hive; + +import java.nio.charset.Charset; +import java.security.PrivilegedExceptionAction; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.Scanner; +import java.io.*; +import java.net.URISyntaxException; +import java.net.URL; + +import org.apache.ambari.view.huetoambarimigration.service.configurationcheck.ConfFileReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.log4j.Logger; +import org.jdom.Attribute; +import org.jdom.Document; +import org.jdom.Element; +import org.jdom.JDOMException; +import org.jdom.input.SAXBuilder; +import org.jdom.output.Format; +import org.jdom.output.XMLOutputter; +import org.json.JSONArray; +import org.json.JSONObject; +import org.apache.hadoop.security.authentication.client.AuthenticatedURL; +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.ambari.view.huetoambarimigration.model.*; + +public class HiveSavedQueryImpl { + + static final Logger logger = Logger.getLogger(HiveSavedQueryImpl.class); + + private static String readAll(Reader rd) throws IOException { + StringBuilder sb = new StringBuilder(); + int cp; + while ((cp = rd.read()) != -1) { + sb.append((char) cp); + } + return sb.toString(); + } + + public void wrtitetoalternatesqlfile(String dirname, String content, + String instance, int i) throws IOException { + + Date dNow = new Date(); + SimpleDateFormat ft = new SimpleDateFormat("YYYY-MM-dd hh:mm:ss"); + String currentDate = ft.format(dNow); + + XMLOutputter xmlOutput = new XMLOutputter(); + + xmlOutput.setFormat(Format.getPrettyFormat()); + + File xmlfile = new File(ConfFileReader.getHomeDir() + "RevertChange.xml"); + + if (xmlfile.exists()) { + String iteration = Integer.toString(i + 1); + SAXBuilder builder = new SAXBuilder(); + Document doc; + try { + doc = (Document) builder.build(xmlfile); + + Element rootNode = doc.getRootElement(); + + Element record = new Element("RevertRecord"); + record.setAttribute(new Attribute("id", iteration)); + record.addContent(new Element("datetime").setText(currentDate + .toString())); + record.addContent(new Element("dirname").setText(dirname)); + record.addContent(new Element("instance").setText(instance)); + record.addContent(new Element("query").setText(content)); + + rootNode.addContent(record); + xmlOutput.output(doc, new FileWriter(ConfFileReader.getHomeDir() + "RevertChange.xml")); + + } catch (JDOMException e) { + // TODO Auto-generated catch block + logger.error("JDOMException: " , e); + } + + } else { + + try { + String iteration = Integer.toString(i + 1); + Element revertrecord = new Element("RevertChangePage"); + Document doc = new Document(revertrecord); + doc.setRootElement(revertrecord); + + Element record = new Element("RevertRecord"); + record.setAttribute(new Attribute("id", iteration)); + record.addContent(new Element("datetime").setText(currentDate + .toString())); + record.addContent(new Element("dirname").setText(dirname)); + record.addContent(new Element("instance").setText(instance)); + record.addContent(new Element("query").setText(content)); + + doc.getRootElement().addContent(record); + + xmlOutput.output(doc, new FileWriter(ConfFileReader.getHomeDir() + "RevertChange.xml")); + + } catch (IOException io) { + + } + + } + + } + + public int fetchMaxidforSavedQueryHive(String driverName, Connection c, int id) + throws SQLException { + + String ds_id = null; + Statement stmt = null; + stmt = c.createStatement(); + ResultSet rs = null; + + if (driverName.contains("postgresql")) { + rs = stmt.executeQuery("select MAX(cast(ds_id as integer)) as max from ds_savedquery_" + id + ";"); + } else if (driverName.contains("mysql")) { + rs = stmt.executeQuery("select max(cast(ds_id as unsigned) ) as max from DS_SAVEDQUERY_" + id + ";"); + } else if (driverName.contains("oracle")) { + rs = stmt.executeQuery("select MAX(cast(ds_id as integer)) as max from ds_savedquery_" + id + ";"); + } + + while (rs.next()) { + ds_id = rs.getString("max"); + + } + + int num; + if (ds_id == null) { + num = 1; + } else { + num = Integer.parseInt(ds_id); + } + + return num; + } + + public int fetchInstancetablenameForSavedqueryHive(String driverName, Connection c, + String instance) throws SQLException { + + String ds_id = new String(); + int id = 0; + Statement stmt = null; + + stmt = c.createStatement(); + ResultSet rs = null; + + if (driverName.contains("oracle")) { + rs = stmt + .executeQuery("select * from viewentity where class_name LIKE 'org.apache.ambari.view.hive.resources.savedQueries.SavedQuery' and view_instance_name='" + + instance + "'"); + } else { + rs = stmt + .executeQuery("select * from viewentity where class_name LIKE 'org.apache.ambari.view.hive.resources.savedQueries.SavedQuery' and view_instance_name='" + + instance + "';"); + } + + + while (rs.next()) { + id = rs.getInt("id"); + + } + + return id; + } + + public int fetchInstanceTablenameHiveHistory(String driverName, Connection c, + String instance) throws SQLException { + String ds_id = new String(); + int id = 0; + Statement stmt = null; + + + stmt = c.createStatement(); + ResultSet rs = null; + + if (driverName.contains("oracle")) { + rs = stmt.executeQuery("select id from viewentity where class_name LIKE 'org.apache.ambari.view.hive.resources.jobs.viewJobs.JobImpl' and view_instance_name='" + instance + "'"); + } else { + rs = stmt.executeQuery("select id from viewentity where class_name LIKE 'org.apache.ambari.view.hive.resources.jobs.viewJobs.JobImpl' and view_instance_name='" + instance + "';"); + } + + + while (rs.next()) { + id = rs.getInt("id"); + System.out.println("id is " + id); + + } + + return id; + + } + + public int fetchMaxdsidFromHiveHistory(String driverName, Connection c, int id) + throws SQLException { + + String ds_id = null; + Statement stmt = null; + + stmt = c.createStatement(); + ResultSet rs = null; + + if (driverName.contains("postgresql")) { + rs = stmt.executeQuery("select MAX(cast(ds_id as integer)) as max from ds_jobimpl_" + id + ";"); + } else if (driverName.contains("mysql")) { + rs = stmt.executeQuery("select max( cast(ds_id as unsigned) ) as max from DS_JOBIMPL_" + id + ";"); + } else if (driverName.contains("oracle")) { + rs = stmt.executeQuery("select MAX(cast(ds_id as integer)) as max from ds_jobimpl_" + id); + } + while (rs.next()) { + ds_id = rs.getString("max"); + } + int num; + if (ds_id == null) { + num = 1; + } else { + num = Integer.parseInt(ds_id); + } + return num; + } + + + /**/ + public void insertRowHiveHistory(String driverName, String dirname, int maxcount, + long epochtime, Connection c, int id, String instance, int i) + throws SQLException, IOException { + String maxcount1 = Integer.toString(maxcount); + + String epochtime1 = Long.toString(epochtime); + + String ds_id = new String(); + Statement stmt = null; + + stmt = c.createStatement(); + String sql = ""; + String revsql = ""; + + if (driverName.contains("mysql")) { + sql = "INSERT INTO DS_JOBIMPL_" + id + " values ('" + maxcount1 + + "','','','','','default'," + epochtime1 + ",0,'','','" + + dirname + "logs','admin','" + dirname + + "query.hql','','job','','','Unknown','" + dirname + + "','','Worksheet');"; + + revsql = "delete from DS_JOBIMPL_" + id + " where ds_id='" + + maxcount1 + "';"; + + } else if (driverName.contains("postgresql")) { + sql = "INSERT INTO ds_jobimpl_" + id + " values ('" + maxcount1 + + "','','','','','default'," + epochtime1 + ",0,'','','" + + dirname + "logs','admin','" + dirname + + "query.hql','','job','','','Unknown','" + dirname + + "','','Worksheet');"; + + revsql = "delete from ds_jobimpl_" + id + " where ds_id='" + + maxcount1 + "';"; + + } else if (driverName.contains("oracle")) { + sql = "INSERT INTO ds_jobimpl_" + id + " values ('" + maxcount1 + + "','','','','','default'," + epochtime1 + ",0,'','','" + + dirname + "logs','admin','" + dirname + + "query.hql','','job','','','Unknown','" + dirname + + "','','Worksheet')"; + revsql = "delete from ds_jobimpl_" + id + " where ds_id='" + + maxcount1 + "'"; + + } + stmt.executeUpdate(sql); + wrtitetoalternatesqlfile(dirname, revsql, instance, i); + } + + public void insertRowinSavedQuery(String driverName, int maxcount, String database, + String dirname, String query, String name, Connection c, int id, + String instance, int i) throws SQLException, IOException { + String maxcount1 = Integer.toString(maxcount); + + String ds_id = new String(); + Statement stmt = null; + String sql = ""; + String revsql = ""; + stmt = c.createStatement(); + + if (driverName.contains("mysql")) { + sql = "INSERT INTO DS_SAVEDQUERY_" + id + " values ('" + + maxcount1 + "','" + database + "','" + "admin" + "','" + + dirname + "query.hql','" + query + "','" + name + "');"; + + revsql = "delete from DS_SAVEDQUERY_" + id + " where ds_id='" + + maxcount1 + "';"; + + } else if (driverName.contains("postgresql")) { + sql = "INSERT INTO ds_savedquery_" + id + " values ('" + + maxcount1 + "','" + database + "','" + "admin" + "','" + + dirname + "query.hql','" + query + "','" + name + "');"; + + revsql = "delete from ds_savedquery_" + id + " where ds_id='" + + maxcount1 + "';"; + + } else if (driverName.contains("oracle")) { + sql = "INSERT INTO ds_savedquery_" + id + " values ('" + + maxcount1 + "','" + database + "','" + "admin" + "','" + + dirname + "query.hql','" + query + "','" + name + "')"; + + revsql = "delete from ds_savedquery_" + id + " where ds_id='" + + maxcount1 + "'"; + + } + wrtitetoalternatesqlfile(dirname, revsql, instance, i); + stmt.executeUpdate(sql); + } + + public long getEpochTime() throws ParseException { + + long seconds = System.currentTimeMillis() / 1000l; + return seconds; + + } + + public String getTime() throws ParseException { + int day, month, year; + int second, minute, hour; + int milisecond; + GregorianCalendar date = new GregorianCalendar(); + + day = date.get(Calendar.DAY_OF_MONTH); + month = date.get(Calendar.MONTH); + year = date.get(Calendar.YEAR); + + second = date.get(Calendar.SECOND); + minute = date.get(Calendar.MINUTE); + hour = date.get(Calendar.HOUR); + milisecond = date.get(Calendar.MILLISECOND); + + String s = year + "-" + (month + 1) + "-" + day + "_" + hour + "-" + + minute; + String s1 = year + "-" + (month + 1) + "-" + day + "_" + hour + "-" + + minute + "-" + second + "-" + milisecond; + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS"); + Date date1 = df.parse(s1); + long epoch = date1.getTime(); + + return s; + + } + + public ArrayList<PojoHive> fetchFromHuedb(String username, + String startdate, String endtime, Connection connection) + throws ClassNotFoundException, IOException { + int id = 0; + int i = 0; + String[] query = new String[100]; + ArrayList<PojoHive> hiveArrayList = new ArrayList<PojoHive>(); + ResultSet rs1 = null; + + try { + Statement statement = connection.createStatement(); + if (username.equals("all")) { + } else { + ResultSet rs = statement + .executeQuery("select id from auth_user where username='" + + username + "';"); + while (rs.next()) { + + id = rs.getInt("id"); + + } + + } + if (startdate.equals("") && endtime.equals("")) { + if (username.equals("all")) { + rs1 = statement + .executeQuery("select data,name,owner_id from beeswax_savedquery;"); + + } else { + rs1 = statement + .executeQuery("select data,name,owner_id from beeswax_savedquery where name!='My saved query'and owner_id =" + + id + ";"); + } + + } else if (!(startdate.equals("")) && !(endtime.equals(""))) { + if (username.equals("all")) { + rs1 = statement + .executeQuery("select data,name,owner_id from beeswax_savedquery where name!='My saved query' AND mtime >= date('" + + startdate + + "') AND mtime <= date('" + + endtime + "');"); + } else { + rs1 = statement + .executeQuery("select data,name,owner_id from beeswax_savedquery where name!='My saved query'and owner_id =" + + id + + " AND mtime >= date('" + + startdate + + "') AND mtime <= date('" + + endtime + + "');"); + } + + } else if (!(startdate.equals("")) && (endtime.equals(""))) { + if (username.equals("all")) { + rs1 = statement + .executeQuery("select data,name,owner_id from beeswax_savedquery where name!='My saved query'and mtime >= date('" + + startdate + "');"); + } else { + rs1 = statement + .executeQuery("select data,name,owner_id from beeswax_savedquery where name!='My saved query'and owner_id =" + + id + + " AND mtime >= date('" + + startdate + + "');"); + } + + } else if ((startdate.equals("")) && !(endtime.equals(""))) { + if (username.equals("all")) { + rs1 = statement + .executeQuery("select data,name,owner_id from beeswax_savedquery where name!='My saved query' AND mtime <= date('" + + endtime + "');"); + } else { + rs1 = statement + .executeQuery("select data,name,owner_id from beeswax_savedquery where name!='My saved query'and owner_id =" + + id + + " AND mtime <= date('" + + endtime + + "');"); + } + + } + while (rs1.next()) { + PojoHive hivepojo = new PojoHive(); + String name = rs1.getString("name"); + String temp = rs1.getString("data"); + InputStream is = new ByteArrayInputStream(temp.getBytes()); + BufferedReader rd = new BufferedReader(new InputStreamReader( + is, Charset.forName("UTF-8"))); + String jsonText = readAll(rd); + JSONObject json = new JSONObject(jsonText); + String resources = json.get("query").toString(); + json = new JSONObject(resources); + + String resarr = (json.get("query")).toString(); + + json = new JSONObject(resources); + String database = (json.get("database")).toString(); + hivepojo.setQuery(resarr); + hivepojo.setDatabase(database); + hivepojo.setOwner(name); + hiveArrayList.add(hivepojo); + i++; + } + + } catch (SQLException e) { + // if the error message is "out of memory", + // it probably means no database file is found + System.err.println(e.getMessage()); + } finally { + try { + if (connection != null) + connection.close(); + } catch (SQLException e) { + logger.error("sql connection exception" , e); + } + } + + return hiveArrayList; + + } + + + public void writetoFilequeryHql(String content, String homedir) { + try { + File file = new File(homedir + "query.hql"); + if (!file.exists()) { + file.createNewFile(); + } + + FileWriter fw = new FileWriter(file.getAbsoluteFile()); + BufferedWriter bw = new BufferedWriter(fw); + bw.write(content); + bw.close(); + + } catch (IOException e) { + logger.error("IOException: " , e); + } + + } + + public void deleteFileQueryhql(String homedir) { + try{ + File file = new File(homedir + "query.hql"); + + if(file.delete()){ + logger.info("temporary hql file deleted"); + }else{ + logger.info("temporary hql file delete failed"); + } + + }catch(Exception e){ + + logger.error("File Exception ",e); + + } + + } + + public void deleteFileQueryLogs(String homedir) { + try{ + File file = new File(homedir + "logs"); + + if(file.delete()){ + logger.info("temporary logs file deleted"); + }else{ + logger.info("temporary logs file delete failed"); + } + + }catch(Exception e){ + + logger.error("File Exception ",e); + + } + + } + + + public void writetoFileLogs(String homedir) { + try { + + String content = ""; + File file = new File(homedir + "logs"); + + // if file doesnt exists, then create it + if (!file.exists()) { + file.createNewFile(); + } + + FileWriter fw = new FileWriter(file.getAbsoluteFile()); + BufferedWriter bw = new BufferedWriter(fw); + bw.write(content); + bw.close(); + + } catch (IOException e) { + logger.error("IOException: " , e); + } + + } + + public void createDirHive(final String dir, final String namenodeuri) + throws IOException, URISyntaxException { + + try { + final Configuration conf = new Configuration(); + + conf.set("fs.hdfs.impl", + org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() + ); + conf.set("fs.file.impl", + org.apache.hadoop.fs.LocalFileSystem.class.getName() + ); + conf.set("fs.defaultFS", namenodeuri); + conf.set("hadoop.job.ugi", "hdfs"); + conf.set("hadoop.security.authentication", "Kerberos"); + + UserGroupInformation.setConfiguration(conf); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hdfs"); + + ugi.doAs(new PrivilegedExceptionAction<Void>() { + + public Void run() throws Exception { + + FileSystem fs = FileSystem.get(conf); + Path src = new Path(dir); + fs.mkdirs(src); + return null; + } + }); + } catch (Exception e) { + logger.error("Webhdfs: " , e); + } + } + + public void createDirHiveSecured(final String dir, final String namenodeuri) + throws IOException, URISyntaxException { + + try { + final Configuration conf = new Configuration(); + + conf.set("fs.hdfs.impl", + org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() + ); + conf.set("fs.file.impl", + org.apache.hadoop.fs.LocalFileSystem.class.getName() + ); + conf.set("fs.defaultFS", namenodeuri); + conf.set("hadoop.job.ugi", "hdfs"); + conf.set("hadoop.security.authentication", "Kerberos"); + + UserGroupInformation.setConfiguration(conf); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hdfs"); + + ugi.doAs(new PrivilegedExceptionAction<Void>() { + + public Void run() throws Exception { + + FileSystem fs = FileSystem.get(conf); + Path src = new Path(dir); + fs.mkdirs(src); + return null; + } + }); + } catch (Exception e) { + logger.error("Webhdfs: " , e); + } + } + + public void putFileinHdfs(final String source, final String dest, + final String namenodeuri) throws IOException { + + try { + final Configuration conf = new Configuration(); + + conf.set("fs.hdfs.impl", + org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() + ); + conf.set("fs.file.impl", + org.apache.hadoop.fs.LocalFileSystem.class.getName() + ); + conf.set("fs.defaultFS", namenodeuri); + conf.set("hadoop.job.ugi", "hdfs"); + conf.set("hadoop.security.authentication", "Kerberos"); + + UserGroupInformation.setConfiguration(conf); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hdfs"); + ugi.doAs(new PrivilegedExceptionAction<Void>() { + + public Void run() throws Exception { + + FileSystem fileSystem = FileSystem.get(conf); + String filename = source.substring( + source.lastIndexOf('/') + 1, source.length()); + String dest1; + if (dest.charAt(dest.length() - 1) != '/') { + dest1 = dest + "/" + filename; + } else { + dest1 = dest + filename; + } + + Path path = new Path(dest1); + if (fileSystem.exists(path)) { + + } + FSDataOutputStream out = fileSystem.create(path); + + InputStream in = new BufferedInputStream( + new FileInputStream(new File(source))); + + byte[] b = new byte[1024]; + int numBytes = 0; + while ((numBytes = in.read(b)) > 0) { + out.write(b, 0, numBytes); + } + in.close(); + out.close(); + fileSystem.close(); + return null; + } + }); + } catch (Exception e) { + logger.error("Webhdfs exception" , e); + } + + } + + + public void putFileinHdfsSecured(final String source, final String dest, + final String namenodeuri) throws IOException { + + try { + final Configuration conf = new Configuration(); + + conf.set("fs.hdfs.impl", + org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() + ); + conf.set("fs.file.impl", + org.apache.hadoop.fs.LocalFileSystem.class.getName() + ); + conf.set("fs.defaultFS", namenodeuri); + conf.set("hadoop.job.ugi", "hdfs"); + conf.set("hadoop.security.authentication", "Kerberos"); + + UserGroupInformation.setConfiguration(conf); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hdfs"); + ugi.doAs(new PrivilegedExceptionAction<Void>() { + + public Void run() throws Exception { + + + FileSystem fileSystem = FileSystem.get(conf); + + String filename = source.substring( + source.lastIndexOf('/') + 1, source.length()); + String dest1; + if (dest.charAt(dest.length() - 1) != '/') { + dest1 = dest + "/" + filename; + } else { + dest1 = dest + filename; + } + + Path path = new Path(dest1); + if (fileSystem.exists(path)) { + + } + // Path pathsource = new Path(source); + FSDataOutputStream out = fileSystem.create(path); + + InputStream in = new BufferedInputStream( + new FileInputStream(new File(source))); + + byte[] b = new byte[1024]; + int numBytes = 0; + while ((numBytes = in.read(b)) > 0) { + out.write(b, 0, numBytes); + } + in.close(); + out.close(); + fileSystem.close(); + + + return null; + } + }); + } catch (Exception e) { + logger.error("Webhdfs exception" , e); + } + + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/af9654ff/contrib/views/hueambarimigration/src/main/java/org/apache/ambari/view/huetoambarimigration/service/pig/PigJobImpl.java ---------------------------------------------------------------------- diff --git a/contrib/views/hueambarimigration/src/main/java/org/apache/ambari/view/huetoambarimigration/service/pig/PigJobImpl.java b/contrib/views/hueambarimigration/src/main/java/org/apache/ambari/view/huetoambarimigration/service/pig/PigJobImpl.java new file mode 100644 index 0000000..614c171 --- /dev/null +++ b/contrib/views/hueambarimigration/src/main/java/org/apache/ambari/view/huetoambarimigration/service/pig/PigJobImpl.java @@ -0,0 +1,563 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.huetoambarimigration.service.pig; + +import java.nio.charset.Charset; +import java.security.PrivilegedExceptionAction; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.Scanner; +import java.io.*; +import java.net.URISyntaxException; +import java.net.URL; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.log4j.Logger; +import org.jdom.Attribute; +import org.jdom.Document; +import org.jdom.Element; +import org.jdom.JDOMException; +import org.jdom.input.SAXBuilder; +import org.jdom.output.Format; +import org.jdom.output.XMLOutputter; +import org.json.JSONArray; +import org.json.JSONObject; + +import org.apache.ambari.view.huetoambarimigration.model.*; +import org.apache.ambari.view.huetoambarimigration.service.configurationcheck.ConfFileReader; + +public class PigJobImpl { + + static final Logger logger = Logger.getLogger(PigJobImpl.class); + + private static String readAll(Reader rd) throws IOException { + StringBuilder sb = new StringBuilder(); + int cp; + while ((cp = rd.read()) != -1) { + sb.append((char) cp); + } + return sb.toString(); + } + + public void wrtitetoalternatesqlfile(String dirname, String content, String instance, int i) throws IOException { + Date dNow = new Date(); + SimpleDateFormat ft = new SimpleDateFormat("YYYY-MM-dd hh:mm:ss"); + String currentDate = ft.format(dNow); + XMLOutputter xmlOutput = new XMLOutputter(); + xmlOutput.setFormat(Format.getPrettyFormat()); + File xmlfile = new File(ConfFileReader.getHomeDir() + "RevertChange.xml"); + if (xmlfile.exists()) { + String iteration = Integer.toString(i + 1); + SAXBuilder builder = new SAXBuilder(); + Document doc; + try { + doc = (Document) builder.build(xmlfile); + Element rootNode = doc.getRootElement(); + Element record = new Element("RevertRecord"); + record.setAttribute(new Attribute("id", iteration)); + record.addContent(new Element("datetime").setText(currentDate.toString())); + record.addContent(new Element("dirname").setText(dirname)); + record.addContent(new Element("instance").setText(instance)); + record.addContent(new Element("query").setText(content)); + rootNode.addContent(record); + xmlOutput.output(doc, new FileWriter(ConfFileReader.getHomeDir() + "RevertChange.xml")); + } catch (JDOMException e) { + + logger.error("Jdom Exception: " , e); + } + + + } else { + // create + try { + String iteration = Integer.toString(i + 1); + Element revertrecord = new Element("RevertChangePage"); + Document doc = new Document(revertrecord); + doc.setRootElement(revertrecord); + Element record = new Element("RevertRecord"); + record.setAttribute(new Attribute("id", iteration)); + record.addContent(new Element("datetime").setText(currentDate.toString())); + record.addContent(new Element("dirname").setText(dirname)); + record.addContent(new Element("instance").setText(instance)); + record.addContent(new Element("query").setText(content)); + doc.getRootElement().addContent(record); + xmlOutput.output(doc, new FileWriter(ConfFileReader.getHomeDir() + "RevertChange.xml")); + } catch (IOException io) { + logger.error("Jdom Exception: " , io); + } + + } + + } + + public int fetchMaxIdforPigJob(String driverName, Connection c, int id) throws SQLException { + + String ds_id = null; + Statement stmt = null; + ResultSet rs = null; + + stmt = c.createStatement(); + + if (driverName.contains("postgresql")) { + rs = stmt.executeQuery("select MAX(cast(ds_id as integer)) as max from ds_pigjob_" + id + ";"); + } else if (driverName.contains("mysql")) { + rs = stmt.executeQuery("select max( cast(ds_id as unsigned) ) as max from DS_PIGJOB_" + id + ";"); + } else if (driverName.contains("oracle")) { + rs = stmt.executeQuery("select MAX(cast(ds_id as integer)) as max from ds_pigjob_" + id); + } + + while (rs.next()) { + ds_id = rs.getString("max"); + + } + + int num; + if (ds_id == null) { + num = 1; + } else { + num = Integer.parseInt(ds_id); + } + + return num; + + } + + public int fetchInstanceTablename(String driverName, Connection c, String instance) throws SQLException { + + + String ds_id = new String(); + int id = 0; + Statement stmt = null; + stmt = c.createStatement(); + + ResultSet rs = null; + if (driverName.contains("oracle")) { + rs = stmt.executeQuery("select id from viewentity where class_name LIKE 'org.apache.ambari.view.pig.resources.jobs.models.PigJob' and view_instance_name='" + instance + "'"); + } else { + rs = stmt.executeQuery("select id from viewentity where class_name LIKE 'org.apache.ambari.view.pig.resources.jobs.models.PigJob' and view_instance_name='" + instance + "';"); + } + while (rs.next()) { + id = rs.getInt("id"); + + } + + return id; + } + + public void insertRowPigJob(String driverName, String dirname, int maxcountforpigjob, String time, String time2, long epochtime, String title, Connection c, int id, String status, String instance, int i) throws SQLException, IOException { + + String epochtime1 = Long.toString(epochtime); + String ds_id = new String(); + Statement stmt = null; + + stmt = c.createStatement(); + String sql = ""; + String revsql = ""; + + if (driverName.contains("mysql")) { + sql = "INSERT INTO DS_PIGJOB_" + id + " values ('" + maxcountforpigjob + "'," + epochtime1 + ",0,'','f','','','admin',0,'" + dirname + "script.pig','','" + maxcountforpigjob + "','','','" + status + "','" + dirname + "','','" + title + "');"; + revsql = "delete from DS_PIGJOB_" + id + " where ds_id='" + maxcountforpigjob + "';"; + + } else if (driverName.contains("postgresql")) { + sql = "INSERT INTO ds_pigjob_" + id + " values ('" + maxcountforpigjob + "'," + epochtime1 + ",0,'','f','','','admin',0,'" + dirname + "script.pig','','" + maxcountforpigjob + "','','','" + status + "','" + dirname + "','','" + title + "');"; + revsql = "delete from ds_pigjob_" + id + " where ds_id='" + maxcountforpigjob + "';"; + + } else if (driverName.contains("oracle")) { + sql = "INSERT INTO ds_pigjob_" + id + " values ('" + maxcountforpigjob + "'," + epochtime1 + ",0,'','f','','','admin',0,'" + dirname + "script.pig','','" + maxcountforpigjob + "','','','" + status + "','" + dirname + "','','" + title + "')"; + revsql = "delete from ds_pigjob_" + id + " where ds_id='" + maxcountforpigjob + "'"; + + } + + wrtitetoalternatesqlfile(dirname, revsql, instance, i); + + stmt.executeUpdate(sql); + + } + + public long getEpochTime() throws ParseException { + int day, month, year; + int second, minute, hour; + int milisecond; + GregorianCalendar date = new GregorianCalendar(); + + day = date.get(Calendar.DAY_OF_MONTH); + month = date.get(Calendar.MONTH); + year = date.get(Calendar.YEAR); + + second = date.get(Calendar.SECOND); + minute = date.get(Calendar.MINUTE); + hour = date.get(Calendar.HOUR); + milisecond = date.get(Calendar.MILLISECOND); + String s1 = year + "-" + (month + 1) + "-" + day + "_" + hour + "-" + minute + "-" + second + "-" + milisecond; + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS"); + Date date1 = df.parse(s1); + long epoch = date1.getTime(); + return epoch; + + } + + public String getTime() throws ParseException { + int day, month, year; + int second, minute, hour; + int milisecond; + GregorianCalendar date = new GregorianCalendar(); + + day = date.get(Calendar.DAY_OF_MONTH); + month = date.get(Calendar.MONTH); + year = date.get(Calendar.YEAR); + + second = date.get(Calendar.SECOND); + minute = date.get(Calendar.MINUTE); + hour = date.get(Calendar.HOUR); + milisecond = date.get(Calendar.MILLISECOND); + String s = year + "-" + (month + 1) + "-" + day + "_" + hour + "-" + minute; + String s1 = year + "-" + (month + 1) + "-" + day + "_" + hour + "-" + minute + "-" + second + "-" + milisecond; + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss-SSS"); + Date date1 = df.parse(s1); + long epoch = date1.getTime(); + return s; + + } + + public String getTimeInorder() throws ParseException { + SimpleDateFormat sdfDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.msssss +00:00:00");//dd/MM/yyyy + Date now = new Date(); + String strDate = sdfDate.format(now); + return strDate; + } + + public ArrayList<PojoPig> fetchFromHueDB(String username, String startdate, String endtime, Connection connection) throws ClassNotFoundException, IOException { + int id = 0; + int i = 0; + String[] query = new String[100]; + ArrayList<PojoPig> pigjobarraylist = new ArrayList<PojoPig>(); + try { + Statement statement = connection.createStatement(); + ResultSet rs1 = null; + if (username.equals("all")) { + } else { + ResultSet rs = statement + .executeQuery("select id from auth_user where username='" + + username + "';"); + while (rs.next()) { + + id = rs.getInt("id"); + + } + + } + + if (startdate.equals("") && endtime.equals("")) { + if (username.equals("all")) { + + rs1 = statement.executeQuery("select status,start_time,statusdir,script_title,user_id from pig_job;"); + + } else { + + rs1 = statement.executeQuery("select status,start_time,statusdir,script_title,user_id from pig_job where user_id =" + id + ";"); + } + + } else if (!(startdate.equals("")) && !(endtime.equals(""))) { + if (username.equals("all")) { + + rs1 = statement.executeQuery("select status,start_time,statusdir,script_title,user_id from pig_job where start_time >= date('" + startdate + "') AND start_time <= date('" + endtime + "');"); + } else { + + rs1 = statement.executeQuery("select status,start_time,statusdir,script_title,user_id from pig_job where user_id =" + id + " AND start_time >= date('" + startdate + "') AND start_time <= date('" + endtime + "');"); + } + + } else if (!(startdate.equals("")) && (endtime.equals(""))) { + if (username.equals("all")) { + + rs1 = statement.executeQuery("select status,start_time,statusdir,script_title,user_id from pig_job where start_time >= date('" + startdate + "');"); + } else { + + rs1 = statement.executeQuery("select status,start_time,statusdir,script_title,user_id from pig_job where user_id =" + id + " AND start_time >= date('" + startdate + "');"); + } + + } else if ((startdate.equals("")) && !(endtime.equals(""))) { + if (username.equals("all")) { + + rs1 = statement.executeQuery("select status,start_time,statusdir,script_title,user_id from pig_job where start_time <= date('" + endtime + "');"); + } else { + + rs1 = statement.executeQuery("select status,start_time,statusdir,script_title,user_id from pig_job where user_id =" + id + " AND start_time <= date('" + endtime + "');"); + } + + } + + while (rs1.next()) { + PojoPig pigjjobobject = new PojoPig(); + + int runstatus = rs1.getInt("status"); + + if (runstatus == 1) { + pigjjobobject.setStatus("RUNNING"); + } else if (runstatus == 2) { + pigjjobobject.setStatus("SUCCEEDED"); + } else if (runstatus == 3) { + pigjjobobject.setStatus("SUBMIT_FAILED"); + } else if (runstatus == 4) { + pigjjobobject.setStatus("KILLED"); + } + String title = rs1.getString("script_title"); + + + pigjjobobject.setTitle(title); + String dir = rs1.getString("statusdir"); + pigjjobobject.setDir(dir); + Date created_data = rs1.getDate("start_time"); + pigjjobobject.setDt(created_data); + + pigjobarraylist.add(pigjjobobject); + + i++; + } + + + } catch (SQLException e) { + logger.error("Sqlexception: " , e); + } finally { + try { + if (connection != null) + connection.close(); + } catch (SQLException e) { + logger.error("Sqlexception in closing the connection: " , e); + + } + } + + return pigjobarraylist; + + } + + public void createDirPigJob(final String dir, final String namenodeuri) throws IOException, + URISyntaxException { + + try { + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser("hdfs"); + + ugi.doAs(new PrivilegedExceptionAction<Void>() { + + public Void run() throws Exception { + + Configuration conf = new Configuration(); + conf.set("fs.hdfs.impl", + org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() + ); + conf.set("fs.file.impl", + org.apache.hadoop.fs.LocalFileSystem.class.getName() + ); + conf.set("fs.defaultFS", namenodeuri); + conf.set("hadoop.job.ugi", "hdfs"); + + FileSystem fs = FileSystem.get(conf); + Path src = new Path(dir); + fs.mkdirs(src); + return null; + } + }); + } catch (Exception e) { + logger.error("Webhdfs exception: " , e); + } + } + + /**/ + public void createDirPigJobSecured(final String dir, final String namenodeuri) throws IOException, + URISyntaxException { + + try { + final Configuration conf = new Configuration(); + + conf.set("fs.hdfs.impl", + org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() + ); + conf.set("fs.file.impl", + org.apache.hadoop.fs.LocalFileSystem.class.getName() + ); + conf.set("fs.defaultFS", namenodeuri); + conf.set("hadoop.job.ugi", "hdfs"); + conf.set("hadoop.security.authentication", "Kerberos"); + + UserGroupInformation.setConfiguration(conf); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser("hdfs"); + + ugi.doAs(new PrivilegedExceptionAction<Void>() { + + public Void run() throws Exception { + + + FileSystem fs = FileSystem.get(conf); + Path src = new Path(dir); + fs.mkdirs(src); + return null; + } + }); + } catch (Exception e) { + logger.error("Webhdfs exception: " , e); + } + } + + /**/ + public void copyFileBetweenHdfs(final String source, final String dest, final String nameNodeuriAmbari, final String nameNodeuriHue) + throws IOException { + + try { + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser("hdfs"); + + ugi.doAs(new PrivilegedExceptionAction<Void>() { + + public Void run() throws Exception { + + Configuration confAmbari = new Configuration(); + confAmbari.set("fs.defaultFS", nameNodeuriAmbari); + confAmbari.set("hadoop.job.ugi", "hdfs"); + FileSystem fileSystemAmbari = FileSystem.get(confAmbari); + + Configuration confHue = new Configuration(); + confHue.set("fs.defaultFS", nameNodeuriAmbari); + confHue.set("hadoop.job.ugi", "hdfs"); + FileSystem fileSystemHue = FileSystem.get(confHue); + + String filename = source.substring( + source.lastIndexOf('/') + 1, source.length()); + String dest1; + if (dest.charAt(dest.length() - 1) != '/') { + dest1 = dest + "/" + filename; + } else { + dest1 = dest + filename; + } + + Path path1 = new Path(source); + FSDataInputStream in1 = fileSystemHue.open(path1); + + Path path = new Path(dest1); + if (fileSystemAmbari.exists(path)) { + + } + + FSDataOutputStream out = fileSystemAmbari.create(path); + + byte[] b = new byte[1024]; + int numBytes = 0; + while ((numBytes = in1.read(b)) > 0) { + out.write(b, 0, numBytes); + } + in1.close(); + out.close(); + fileSystemAmbari.close(); + return null; + } + }); + } catch (Exception e) { + logger.error("Webhdfs exception: " , e); + } + + } + + /**/ + public void copyFileBetweenHdfsSecured(final String source, final String dest, final String nameNodeuriAmbari, final String nameNodeuriHue) + throws IOException { + + try { + + final Configuration confAmbari = new Configuration(); + confAmbari.set("fs.defaultFS", nameNodeuriAmbari); + confAmbari.set("hadoop.job.ugi", "hdfs"); + + final Configuration confHue = new Configuration(); + confHue.set("fs.defaultFS", nameNodeuriAmbari); + confHue.set("hadoop.job.ugi", "hdfs"); + + confAmbari.set("hadoop.security.authentication", "Kerberos"); + confHue.set("hadoop.security.authentication", "Kerberos"); + + UserGroupInformation ugi = UserGroupInformation + .createRemoteUser("hdfs"); + + ugi.doAs(new PrivilegedExceptionAction<Void>() { + + public Void run() throws Exception { + + + FileSystem fileSystemAmbari = FileSystem.get(confAmbari); + + FileSystem fileSystemHue = FileSystem.get(confHue); + + String filename = source.substring( + source.lastIndexOf('/') + 1, source.length()); + String dest1; + if (dest.charAt(dest.length() - 1) != '/') { + dest1 = dest + "/" + filename; + } else { + dest1 = dest + filename; + } + + Path path1 = new Path(source); + FSDataInputStream in1 = fileSystemHue.open(path1); + + Path path = new Path(dest1); + if (fileSystemAmbari.exists(path)) { + + } + FSDataOutputStream out = fileSystemAmbari.create(path); + byte[] b = new byte[1024]; + int numBytes = 0; + while ((numBytes = in1.read(b)) > 0) { + out.write(b, 0, numBytes); + } + in1.close(); + out.close(); + fileSystemAmbari.close(); + return null; + } + }); + } catch (Exception e) { + logger.error("Webhdfs exception: " , e); + } + + } + +}
