AMBARI-15046. Hive view Upload table feature now supports upload from HDFS and Local.Supports JSON/XML/CSV as uploaded files and ORC and all other hive internal storage types. (Nitiraj Rathore via pallavkul)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0747b6c7 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0747b6c7 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0747b6c7 Branch: refs/heads/branch-2.2 Commit: 0747b6c77020b9d42551fe0437968b7658148b37 Parents: fc8f637 Author: Pallav Kulshreshtha <[email protected]> Authored: Fri Feb 26 15:28:24 2016 +0530 Committer: Pallav Kulshreshtha <[email protected]> Committed: Fri Feb 26 15:28:24 2016 +0530 ---------------------------------------------------------------------- contrib/views/hive/pom.xml | 5 + .../org/apache/ambari/view/hive/client/Row.java | 4 +- .../view/hive/resources/jobs/JobService.java | 17 + .../view/hive/resources/uploads/CSVParser.java | 185 ------ .../uploads/ColumnDescriptionImpl.java | 55 +- .../view/hive/resources/uploads/DataParser.java | 63 -- .../hive/resources/uploads/HiveFileType.java | 30 + .../view/hive/resources/uploads/IParser.java | 37 -- .../hive/resources/uploads/ParseOptions.java | 46 -- .../view/hive/resources/uploads/ParseUtils.java | 103 ---- .../hive/resources/uploads/QueryGenerator.java | 66 -- .../hive/resources/uploads/TableDataReader.java | 86 +++ .../view/hive/resources/uploads/TableInfo.java | 62 -- .../view/hive/resources/uploads/TableInput.java | 90 +++ .../resources/uploads/UploadFromHdfsInput.java | 91 +++ .../hive/resources/uploads/UploadService.java | 462 ++++++++------ .../resources/uploads/parsers/DataParser.java | 72 +++ .../uploads/parsers/EndOfDocumentException.java | 41 ++ .../hive/resources/uploads/parsers/IParser.java | 48 ++ .../resources/uploads/parsers/ParseOptions.java | 47 ++ .../resources/uploads/parsers/ParseUtils.java | 134 +++++ .../hive/resources/uploads/parsers/Parser.java | 154 +++++ .../resources/uploads/parsers/PreviewData.java | 56 ++ .../resources/uploads/parsers/RowIterator.java | 96 +++ .../uploads/parsers/RowMapIterator.java | 29 + .../uploads/parsers/csv/CSVIterator.java | 57 ++ .../uploads/parsers/csv/CSVParser.java | 55 ++ .../uploads/parsers/json/JSONIterator.java | 160 +++++ .../uploads/parsers/json/JSONParser.java | 85 +++ .../uploads/parsers/xml/XMLIterator.java | 195 ++++++ .../uploads/parsers/xml/XMLParser.java | 100 ++++ .../uploads/query/DeleteQueryInput.java | 48 ++ .../uploads/query/InsertFromQueryInput.java | 68 +++ .../resources/uploads/query/LoadQueryInput.java | 67 +++ .../resources/uploads/query/QueryGenerator.java | 98 +++ .../hive/resources/uploads/query/TableInfo.java | 83 +++ .../ui/hive-web/app/adapters/upload-table.js | 67 ++- .../ui/hive-web/app/components/input-header.js | 61 ++ .../ui/hive-web/app/components/radio-button.js | 39 ++ .../ui/hive-web/app/controllers/upload-table.js | 598 +++++++++++++++---- .../ui/hive-web/app/initializers/i18n.js | 1 + .../resources/ui/hive-web/app/services/job.js | 15 + .../resources/ui/hive-web/app/styles/app.scss | 59 +- .../app/templates/components/input-header.hbs | 20 + .../ui/hive-web/app/templates/upload-table.hbs | 202 +++++-- .../hive/resources/upload/CSVParserTest.java | 144 +++++ .../resources/upload/DataParserCSVTest.java | 214 +++++++ .../resources/upload/DataParserJSONTest.java | 209 +++++++ .../hive/resources/upload/DataParserTest.java | 65 -- .../resources/upload/DataParserXMLTest.java | 233 ++++++++ .../hive/resources/upload/JsonParserTest.java | 184 ++++++ .../resources/upload/QueryGeneratorTest.java | 84 +++ .../resources/upload/TableDataReaderTest.java | 127 ++++ .../hive/resources/upload/XMLParserTest.java | 180 ++++++ 54 files changed, 4598 insertions(+), 999 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/views/hive/pom.xml b/contrib/views/hive/pom.xml index 9f3f1f7..9ad4296 100644 --- a/contrib/views/hive/pom.xml +++ b/contrib/views/hive/pom.xml @@ -221,6 +221,11 @@ <artifactId>httpcore</artifactId> <version>4.4.3</version> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-csv</artifactId> + <version>1.1</version> + </dependency> </dependencies> <properties> http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/client/Row.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/client/Row.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/client/Row.java index 35f216b..cfce1f0 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/client/Row.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/client/Row.java @@ -56,8 +56,8 @@ public class Row { Row row1 = (Row) o; - // Probably incorrect - comparing Object[] arrays with Arrays.equals - return Arrays.equals(row, row1.row); + boolean retValue = Arrays.equals(row, row1.row); + return retValue; } @Override http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java index fd69893..f7f883b 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java @@ -259,6 +259,23 @@ public class JobService extends BaseService { } } + + @Path("{jobId}/status") + @GET + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response fetchJobStatus(@PathParam("jobId") String jobId) throws ItemNotFound, HiveClientException, NoOperationStatusSetException { + JobController jobController = getResourceManager().readController(jobId); + String jobStatus = jobController.getStatus().status; + LOG.info("jobStatus : {} for jobId : {}",jobStatus, jobId); + + JSONObject jsonObject = new JSONObject(); + jsonObject.put("jobStatus", jobStatus); + jsonObject.put("jobId", jobId); + + return Response.ok(jsonObject).build(); + } + /** * Get next results page */ http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/CSVParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/CSVParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/CSVParser.java deleted file mode 100644 index 388cf53..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/CSVParser.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.view.hive.resources.uploads; - -import org.apache.ambari.view.hive.client.ColumnDescription; -import org.apache.ambari.view.hive.client.ColumnDescriptionShort; -import org.apache.ambari.view.hive.client.Row; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVRecord; - -import java.io.*; -import java.util.*; - -/** - * Parses the given Reader and extracts headers and rows, and detect datatypes of columns - */ -public class CSVParser implements IParser { - - static class CSVIterator implements Iterator<Row> { - - private Iterator<CSVRecord> iterator; - - public CSVIterator(Iterator<CSVRecord> iterator) { - this.iterator = iterator; - } - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public Row next() { - CSVRecord row = iterator.next(); - Object[] values = new Object[row.size()]; - for (int i = 0; i < values.length; i++) { - values[i] = row.get(i); - } - Row r = new Row(values); - return r; - } - - @Override - public void remove() { - this.iterator.remove(); - } - } - - private Reader originalReader; // same as CSV reader in this case - private ParseOptions parseOptions; - private CSVIterator iterator; - private List<Row> previewRows; - private List<ColumnDescription> header; - private boolean isHeaderFirstRow = false; - private int numberOfPreviewRows = 10; - private org.apache.commons.csv.CSVParser parser; - - public CSVParser(Reader reader, ParseOptions parseOptions) throws IOException { - this.originalReader = reader; - this.parseOptions = parseOptions; - // always create without headers - parser = new org.apache.commons.csv.CSVParser(reader, CSVFormat.EXCEL); - iterator = new CSVIterator(parser.iterator()); - } - - public void parsePreview() { - try { - numberOfPreviewRows = (Integer) parseOptions.getOption(ParseOptions.OPTIONS_NUMBER_OF_PREVIEW_ROWS); - } catch (Exception e) { - } - - int numberOfRows = numberOfPreviewRows; - previewRows = new ArrayList<Row>(numberOfPreviewRows); // size including the header. - - Row headerRow = null; - if (parseOptions.getOption(ParseOptions.OPTIONS_HEADER).equals(ParseOptions.HEADER_FIRST_RECORD)) { - if (!this.iterator().hasNext()) { - throw new NoSuchElementException("Cannot parse Header"); - } - isHeaderFirstRow = true; - headerRow = iterator().next(); - previewRows.add(headerRow); - } - - // find data types. - int[][] typeCounts = null; - Row r = null; - int numOfCols = 0; - if (iterator().hasNext()) { - r = iterator().next(); - numOfCols = r.getRow().length; - typeCounts = new int[numOfCols][ColumnDescription.DataTypes.values().length]; - } else { - throw new NoSuchElementException("No rows in the csv."); - } - - while (true) { - // create Header definition from row - Object[] values = r.getRow(); - previewRows.add(r); - - if (values.length != numOfCols) - throw new IllegalArgumentException("Illegal number of cols for row : " + r); - - for (int colNum = 0; colNum < values.length; colNum++) { - // detect type - ColumnDescription.DataTypes type = ParseUtils.detectHiveDataType(values[colNum]); - typeCounts[colNum][type.ordinal()]++; - } - numberOfRows--; - if (numberOfRows <= 0 || !iterator().hasNext()) - break; - - r = iterator().next(); - } - ; - - if (previewRows.size() <= 0) - throw new NoSuchElementException("Does not contain any rows."); - - header = new ArrayList<ColumnDescription>(numOfCols); - for (int colNum = 0; colNum < numOfCols; colNum++) { - int dataTypeId = getLikelyDataType(typeCounts, colNum); - ColumnDescription.DataTypes type = ColumnDescription.DataTypes.values()[dataTypeId]; - String colName = "Column" + colNum; - if (null != headerRow) - colName = (String) headerRow.getRow()[colNum]; - - ColumnDescription cd = new ColumnDescriptionImpl(colName, type.toString(), colNum); - header.add(cd); - } - } - - /** - * returns which datatype was detected for the maximum number of times in the given column - * @param typeCounts - * @param colNum - * @return - */ - private int getLikelyDataType(int[][] typeCounts, int colNum) { - int[] colArray = typeCounts[colNum]; - int maxIndex = 0; - int i = 1; - for (; i < colArray.length; i++) { - if (colArray[i] > colArray[maxIndex]) - maxIndex = i; - } - - return maxIndex; - } - - @Override - public Reader getCSVReader() { - return originalReader; - } - - @Override - public List<ColumnDescription> getHeader() { - return header; - } - - @Override - public List<Row> getPreviewRows() { - return this.previewRows; - } - - public Iterator<Row> iterator() { - return iterator; // only one iterator per parser. - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/ColumnDescriptionImpl.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/ColumnDescriptionImpl.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/ColumnDescriptionImpl.java index 50f5036..229b7ed 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/ColumnDescriptionImpl.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/ColumnDescriptionImpl.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * <p/> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,10 +22,24 @@ import org.apache.ambari.view.hive.client.ColumnDescription; import java.io.Serializable; +/** + * implementation of ColumnDescription which also includes scale and precision. + */ public class ColumnDescriptionImpl implements ColumnDescription, Serializable { private String name; private String type; private int position; + /** + * can be null + */ + private Integer precision; + /** + * can be null + */ + private Integer scale; + + public ColumnDescriptionImpl() { + } public ColumnDescriptionImpl(String name, String type, int position) { this.name = name; @@ -33,6 +47,21 @@ public class ColumnDescriptionImpl implements ColumnDescription, Serializable { this.position = position; } + public ColumnDescriptionImpl(String name, String type, int position, int precision) { + this.name = name; + this.type = type; + this.position = position; + this.precision = precision; + } + + public ColumnDescriptionImpl(String name, String type, int position, int precision, int scale) { + this.name = name; + this.type = type; + this.position = position; + this.precision = precision; + this.scale = scale; + } + @Override public String getName() { return name; @@ -63,6 +92,22 @@ public class ColumnDescriptionImpl implements ColumnDescription, Serializable { this.position = position; } + public Integer getPrecision() { + return precision; + } + + public Integer getScale() { + return scale; + } + + public void setPrecision(Integer precision) { + this.precision = precision; + } + + public void setScale(Integer scale) { + this.scale = scale; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -88,8 +133,10 @@ public class ColumnDescriptionImpl implements ColumnDescription, Serializable { public String toString() { return new StringBuilder().append("ColumnDescriptionImpl[") .append("name : ").append(name) - .append("type : " + type) - .append("position : " + position) + .append(", type : " + type) + .append(", position : " + position) + .append(", precision : " + precision) + .append(", scale : " + scale) .append("]").toString(); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/DataParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/DataParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/DataParser.java deleted file mode 100644 index 5f2db55..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/DataParser.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.uploads; - -import org.apache.ambari.view.hive.client.ColumnDescription; -import org.apache.ambari.view.hive.client.Row; - -import java.io.IOException; -import java.io.Reader; -import java.util.Iterator; -import java.util.List; - -public class DataParser implements IParser { - - private IParser parser; - - public DataParser(Reader reader, ParseOptions parseOptions) throws IOException { - if (parseOptions.getOption(ParseOptions.OPTIONS_FILE_TYPE).equals(ParseOptions.FILE_TYPE_CSV)) { - parser = new CSVParser(reader, parseOptions); - } - } - - @Override - public Reader getCSVReader() { - return parser.getCSVReader(); - } - - @Override - public List<ColumnDescription> getHeader() { - return parser.getHeader(); - } - - @Override - public List<Row> getPreviewRows() { - return parser.getPreviewRows(); - } - - @Override - public void parsePreview() { - parser.parsePreview(); - } - - @Override - public Iterator<Row> iterator() { - return parser.iterator(); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/HiveFileType.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/HiveFileType.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/HiveFileType.java new file mode 100644 index 0000000..6cc1d46 --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/HiveFileType.java @@ -0,0 +1,30 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.uploads; + +public enum HiveFileType { + SEQUENCEFILE, + TEXTFILE, + RCFILE, + ORC, + PARQUET, + AVRO, + INPUTFORMAT; +} http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/IParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/IParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/IParser.java deleted file mode 100644 index c478b70..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/IParser.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.uploads; - -import org.apache.ambari.view.hive.client.ColumnDescription; -import org.apache.ambari.view.hive.client.Row; - -import java.io.File; -import java.io.InputStream; -import java.io.Reader; -import java.util.List; - -public interface IParser extends Iterable<Row> { - public Reader getCSVReader(); - - public List<ColumnDescription> getHeader(); - - public List<Row> getPreviewRows(); - - public void parsePreview(); -} http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/ParseOptions.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/ParseOptions.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/ParseOptions.java deleted file mode 100644 index 2ec3b1b..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/ParseOptions.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.uploads; - -import java.util.HashMap; - -public class ParseOptions { - final public static String OPTIONS_FILE_TYPE = "FILE_TYPE"; - final public static String OPTIONS_HEADER = "HEADER"; - final public static String OPTIONS_NUMBER_OF_PREVIEW_ROWS = "NUMBER_OF_PREVIEW_ROWS"; - - final public static String FILE_TYPE_CSV = "CSV"; - final public static String FILE_TYPE_JSON = "JSON"; - final public static String XML = "XML"; - - final public static String HEADER_FIRST_RECORD = "FIRST_RECORD"; - final public static String HEADER_PROVIDED_BY_USER = "PROVIDED_BY_USER"; - - final public static String HEADERS = "HEADERS"; - - private HashMap<String, Object> options = new HashMap<String, Object>(); - - public void setOption(String key, Object value) { - this.options.put(key, value); - } - - public Object getOption(String key) { - return this.options.get(key); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/ParseUtils.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/ParseUtils.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/ParseUtils.java deleted file mode 100644 index aea370e..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/ParseUtils.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.uploads; - -import org.apache.ambari.view.hive.client.ColumnDescription; - -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; - -public class ParseUtils { - - final public static String[] DATE_FORMATS = {"mm/dd/yyyy", "dd/mm/yyyy", "mm-dd-yyyy" /*add more formatss*/}; - - public static boolean isInteger(Object object) { - if (object == null) - return false; - - if (object instanceof Integer) - return true; - - try { - Integer i = Integer.parseInt(object.toString()); - return true; - } catch (NumberFormatException nfe) { - return false; - } - } - - public static boolean isDouble(Object object) { - if (object == null) - return false; - - if (object instanceof Double) - return true; - - try { - Double i = Double.parseDouble(object.toString()); - return true; - } catch (NumberFormatException nfe) { - return false; - } - } - - public static boolean isChar(Object object) { - if (object == null) - return false; - - if (object instanceof Character) - return true; - - String str = object.toString().trim(); - if (str.length() == 1) - return true; - - return false; - } - - public static boolean isDate(Object object) { - if (object == null) - return false; - - if (object instanceof Date) - return true; - - String str = object.toString(); - for (String format : DATE_FORMATS) { - try { - Date i = new SimpleDateFormat(format).parse(str); - return true; - } catch (Exception e) { - } - } - - return false; - } - - public static ColumnDescription.DataTypes detectHiveDataType(Object object) { - // detect Integer - if (isInteger(object)) return ColumnDescription.DataTypes.INT; - if (isDouble(object)) return ColumnDescription.DataTypes.DOUBLE; - if (isDate(object)) return ColumnDescription.DataTypes.DATE; - if (isChar(object)) return ColumnDescription.DataTypes.CHAR; - - return ColumnDescription.DataTypes.STRING; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/QueryGenerator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/QueryGenerator.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/QueryGenerator.java deleted file mode 100644 index 98616cf..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/QueryGenerator.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.uploads; - -import org.apache.ambari.view.hive.client.ColumnDescription; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.Comparator; -import java.util.List; - -/** - * generates the sql query from given data - */ -public class QueryGenerator { - protected final static Logger LOG = - LoggerFactory.getLogger(QueryGenerator.class); - - public String generateCreateQuery(TableInfo tableInfo) { - String tableName = tableInfo.getTableName(); - List<ColumnDescription> cdList = tableInfo.getColumns(); - - StringBuilder query = new StringBuilder(); - query.append("create table " + tableName + " ("); - Collections.sort(cdList, new Comparator<ColumnDescription>() { - @Override - public int compare(ColumnDescription o1, ColumnDescription o2) { - return o1.getPosition() - o2.getPosition(); - } - }); - - boolean first = true; - for (ColumnDescription cd : cdList) { - if (first) { - first = false; - } else { - query.append(", "); - } - - query.append(cd.getName() + " " + cd.getType()); - } - - query.append(") ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE;"); - - String queryString = query.toString(); - LOG.info("Query : %S", queryString); - return queryString; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableDataReader.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableDataReader.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableDataReader.java new file mode 100644 index 0000000..e9bdb92 --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableDataReader.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.uploads; + +import org.apache.ambari.view.hive.client.Row; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; + +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; +import java.io.StringWriter; +import java.util.Iterator; + +/** + * Takes row iterator as input. + * iterate over rows and creates a CSV formated stream separating rows by endline "\n" + * Note : column values should not contain "\n". + */ +public class TableDataReader extends Reader { + + private static final int CAPACITY = 1024; + private StringReader stringReader = new StringReader(""); + + private Iterator<Row> iterator; + private static final CSVFormat CSV_FORMAT = CSVFormat.DEFAULT.withRecordSeparator("\n"); + + public TableDataReader(Iterator<Row> rowIterator) { + this.iterator = rowIterator; + } + + @Override + public int read(char[] cbuf, int off, int len) throws IOException { + + int totalLen = len; + int count = 0; + do { + int n = stringReader.read(cbuf, off, len); + + if (n != -1) { + // n were read + len = len - n; // len more to be read + off = off + n; // off now shifted to n more + count += n; + } + + if (count == totalLen) return count; // all totalLen characters were read + + if (iterator.hasNext()) { // keep reading as long as we keep getting rows + StringWriter stringWriter = new StringWriter(CAPACITY); + CSVPrinter csvPrinter = new CSVPrinter(stringWriter, CSV_FORMAT); + Row row = iterator.next(); + csvPrinter.printRecord(row.getRow()); + stringReader.close(); // close the old string reader + stringReader = new StringReader(stringWriter.getBuffer().toString()); + csvPrinter.close(); + stringWriter.close(); + } else { + return count == 0 ? -1 : count; + } + } while (count < totalLen); + + return count; + } + + @Override + public void close() throws IOException { + + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableInfo.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableInfo.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableInfo.java deleted file mode 100644 index ed4943d..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableInfo.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.uploads; - -import org.apache.ambari.view.hive.client.ColumnDescription; - -import java.util.List; - -public class TableInfo { - private String tableName; - private String databaseName; - private List<ColumnDescription> columns; - - public String getTableName() { - return tableName; - } - - public void setTableName(String tableName) { - this.tableName = tableName; - } - - public String getDatabaseName() { - return databaseName; - } - - public void setDatabaseName(String databaseName) { - this.databaseName = databaseName; - } - - public List<ColumnDescription> getColumns() { - return columns; - } - - public void setColumns(List<ColumnDescription> columns) { - this.columns = columns; - } - - public TableInfo(String databaseName, String tableName, List<ColumnDescription> columns) { - this.tableName = tableName; - this.databaseName = databaseName; - this.columns = columns; - } - - public TableInfo() { - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableInput.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableInput.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableInput.java new file mode 100644 index 0000000..4a50e93 --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableInput.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.uploads; + +import java.util.List; + +/** + * used as input in REST call + */ +class TableInput { + public Boolean isFirstRowHeader; + public List<ColumnDescriptionImpl> header; + public String tableName; + public String databaseName; + /** + * the format of the file created for the table inside hive : ORC TEXTFILE etc. + */ + public String fileType; + /** + * the format of the file uploaded. CSV, JSON, XML etc. + */ + public String fileFormat; + + public TableInput() { + } + + public Boolean getIsFirstRowHeader() { + return isFirstRowHeader; + } + + public void setIsFirstRowHeader(Boolean isFirstRowHeader) { + this.isFirstRowHeader = isFirstRowHeader; + } + + public List<ColumnDescriptionImpl> getHeader() { + return header; + } + + public void setHeader(List<ColumnDescriptionImpl> header) { + this.header = header; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public String getDatabaseName() { + return databaseName; + } + + public void setDatabaseName(String databaseName) { + this.databaseName = databaseName; + } + + public String getFileType() { + return fileType; + } + + public void setFileType(String fileType) { + this.fileType = fileType; + } + + public String getFileFormat() { + return fileFormat; + } + + public void setFileFormat(String fileFormat) { + this.fileFormat = fileFormat; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/UploadFromHdfsInput.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/UploadFromHdfsInput.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/UploadFromHdfsInput.java new file mode 100644 index 0000000..9b052ab --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/UploadFromHdfsInput.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.uploads; + +import java.io.Serializable; + +public class UploadFromHdfsInput implements Serializable{ + private Boolean isFirstRowHeader; + private String inputFileType; + private String hdfsPath; + private String tableName; + private String databaseName; + + public UploadFromHdfsInput() { + } + + public UploadFromHdfsInput(Boolean isFirstRowHeader, String inputFileType, String hdfsPath, String tableName, String databaseName) { + this.isFirstRowHeader = isFirstRowHeader; + this.inputFileType = inputFileType; + this.hdfsPath = hdfsPath; + this.tableName = tableName; + this.databaseName = databaseName; + } + + public Boolean getIsFirstRowHeader() { + return isFirstRowHeader; + } + + public void setIsFirstRowHeader(Boolean firstRowHeader) { + isFirstRowHeader = firstRowHeader; + } + + public String getInputFileType() { + return inputFileType; + } + + public void setInputFileType(String inputFileType) { + this.inputFileType = inputFileType; + } + + public String getHdfsPath() { + return hdfsPath; + } + + public void setHdfsPath(String hdfsPath) { + this.hdfsPath = hdfsPath; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public String getDatabaseName() { + return databaseName; + } + + public void setDatabaseName(String databaseName) { + this.databaseName = databaseName; + } + + @Override + public String toString() { + return "UploadFromHdfsInput{" + + "isFirstRowHeader=" + isFirstRowHeader + + ", inputFileType='" + inputFileType + '\'' + + ", hdfsPath='" + hdfsPath + '\'' + + ", tableName='" + tableName + '\'' + + ", databaseName='" + databaseName + '\'' + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/UploadService.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/UploadService.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/UploadService.java index 8b5b851..6935ec5 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/UploadService.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/UploadService.java @@ -21,40 +21,44 @@ package org.apache.ambari.view.hive.resources.uploads; import com.sun.jersey.core.header.FormDataContentDisposition; import com.sun.jersey.multipart.FormDataParam; import org.apache.ambari.view.hive.BaseService; -import org.apache.ambari.view.hive.client.ColumnDescription; -import org.apache.ambari.view.hive.client.HiveClientException; import org.apache.ambari.view.hive.persistence.utils.ItemNotFound; -import org.apache.ambari.view.hive.resources.jobs.NoOperationStatusSetException; import org.apache.ambari.view.hive.resources.jobs.viewJobs.Job; import org.apache.ambari.view.hive.resources.jobs.viewJobs.JobController; import org.apache.ambari.view.hive.resources.jobs.viewJobs.JobImpl; import org.apache.ambari.view.hive.resources.jobs.viewJobs.JobResourceManager; +import org.apache.ambari.view.hive.resources.uploads.parsers.DataParser; +import org.apache.ambari.view.hive.resources.uploads.parsers.ParseOptions; +import org.apache.ambari.view.hive.resources.uploads.parsers.PreviewData; +import org.apache.ambari.view.hive.resources.uploads.query.*; import org.apache.ambari.view.hive.utils.ServiceFormattedException; import org.apache.ambari.view.hive.utils.SharedObjectsFactory; import org.apache.ambari.view.utils.ambari.AmbariApi; +import org.apache.commons.io.FilenameUtils; import org.apache.commons.io.input.ReaderInputStream; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; -import org.json.simple.JSONArray; import org.json.simple.JSONObject; -import org.json.simple.JSONValue; import javax.ws.rs.*; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.io.*; import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; /** - * Servlet for queries + * UI driven end points for creation of new hive table and inserting data into it. + * It uploads a file, parses it partially based on its type, generates preview, + * creates temporary hive table for storage as CSV and actual hive table, + * uploads the file again, parses it, create CSV stream and upload to hdfs in temporary table, + * insert rows from temporary table to actual table, delete temporary table. + * <p/> * API: - * POST /preview - * POST /upload - * POST /createTable - * GET /createTable/status + * POST /preview : takes stream, parses it and returns preview rows, headers and column type suggestions + * POST /createTable : runs hive query to create table in hive + * POST /upload : takes stream, parses it and converts it into CSV and uploads it to the temporary table + * POST /insertIntoTable : runs hive query to insert data from temporary table to actual hive table + * POST /deleteTable : deletes the temporary table */ public class UploadService extends BaseService { @@ -62,107 +66,255 @@ public class UploadService extends BaseService { protected JobResourceManager resourceManager; - final private String HIVE_META_STORE_LOCATION_KEY = "hive.metastore.warehouse.dir"; - final private String HIVE_SITE = "hive-site"; - final private String HIVE_DEFAULT_DB = "default"; + final private static String HIVE_META_STORE_LOCATION_KEY = "hive.metastore.warehouse.dir"; + final private static String HIVE_SITE = "hive-site"; + final private static String HIVE_DEFAULT_DB = "default"; + + @POST + @Path("/previewFromHdfs") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response uploadForPreviewFromHDFS(UploadFromHdfsInput input) { + + InputStream uploadedInputStream = null; + try { + uploadedInputStream = getHDFSFileStream(input.getHdfsPath()); + PreviewData pd = generatePreview(input.getIsFirstRowHeader(), input.getInputFileType(), uploadedInputStream); + String tableName = getBasenameFromPath(input.getHdfsPath()); + return createPreviewResponse(pd, input.getIsFirstRowHeader(),tableName); + } catch (Exception e) { + LOG.error("Exception occurred while generating preview for hdfs file : " + input.getHdfsPath(), e); + throw new ServiceFormattedException(e.getMessage(), e); + } finally { + if (null != uploadedInputStream) { + try { + uploadedInputStream.close(); + } catch (IOException e) { + LOG.error("Exception occured while closing the HDFS file stream for path " + input.getHdfsPath(), e); + } + } + } + } @POST @Path("/preview") @Consumes(MediaType.MULTIPART_FORM_DATA) public Response uploadForPreview( @FormDataParam("file") InputStream uploadedInputStream, - @FormDataParam("file") FormDataContentDisposition fileDetail) { + @FormDataParam("file") FormDataContentDisposition fileDetail, + @FormDataParam("isFirstRowHeader") Boolean isFirstRowHeader, + @FormDataParam("inputFileType") String inputFileType + ) { + try { + PreviewData pd = generatePreview(isFirstRowHeader, inputFileType, uploadedInputStream); + return createPreviewResponse(pd, isFirstRowHeader,getBasename(fileDetail.getFileName())); + } catch (Exception e) { + LOG.error("Exception occurred while generating preview for local file", e); + throw new ServiceFormattedException(e.getMessage(), e); + } + } - ParseOptions parseOptions = new ParseOptions(); - parseOptions.setOption(ParseOptions.OPTIONS_FILE_TYPE, ParseOptions.FILE_TYPE_CSV); - parseOptions.setOption(ParseOptions.OPTIONS_HEADER, ParseOptions.HEADER_FIRST_RECORD); + @Path("/createTable") + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response createTable(TableInput tableInput) { try { - DataParser dataParser = new DataParser(new InputStreamReader(uploadedInputStream), parseOptions); + List<ColumnDescriptionImpl> header = tableInput.getHeader(); + String databaseName = tableInput.getDatabaseName(); + String tableName = tableInput.getTableName(); + Boolean isFirstRowHeader = tableInput.getIsFirstRowHeader(); + String fileTypeStr = tableInput.getFileType(); + HiveFileType hiveFileType = HiveFileType.valueOf(fileTypeStr); + - dataParser.parsePreview(); + TableInfo ti = new TableInfo(databaseName, tableName, header, hiveFileType); + String tableCreationQuery = generateCreateQuery(ti); - Map<String, Object> retData = new HashMap<String, Object>(); - retData.put("header", dataParser.getHeader()); - retData.put("rows", dataParser.getPreviewRows()); - retData.put("isFirstRowHeader", true); + LOG.info("tableCreationQuery : {}", tableCreationQuery); - JSONObject jsonObject = new JSONObject(retData); - return Response.ok(jsonObject).build(); - } catch (IOException e) { + Job actualTableJob = createJob(tableCreationQuery, databaseName); + String actualTableJobId = actualTableJob.getId(); + + JSONObject jobObject = new JSONObject(); + jobObject.put("jobId", actualTableJobId); + + LOG.info("table creation jobId {}", actualTableJobId); + return Response.ok(jobObject).status(201).build(); + } catch (Exception e) { + LOG.error("Exception occurred while creating table with input : " + tableInput, e); throw new ServiceFormattedException(e.getMessage(), e); } } - public static class TableInput { - public Boolean isFirstRowHeader; - public String header; - public String tableName; - public String databaseName; + @Path("/uploadFromHDFS") + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response uploadFileFromHdfs(UploadFromHdfsInput input ) { + if (ParseOptions.InputFileType.CSV.toString().equals(input.getInputFileType()) && input.getIsFirstRowHeader().equals(Boolean.FALSE)) { + // upload using the LOAD query + LoadQueryInput loadQueryInput = new LoadQueryInput(input.getHdfsPath(), input.getDatabaseName(), input.getTableName()); + String loadQuery = new QueryGenerator().generateLoadQuery(loadQueryInput); + + try { + Job job = createJob(loadQuery, input.getDatabaseName()); + + JSONObject jo = new JSONObject(); + jo.put("jobId", job.getId()); + + return Response.ok(jo).build(); + } catch (Exception e) { + LOG.error("Exception occurred while creating job for Load From HDFS query : " + loadQuery, e); + throw new ServiceFormattedException(e.getMessage(), e); + } - public TableInput() { + } else { + // create stream and upload + InputStream hdfsStream = null; + try { + hdfsStream = getHDFSFileStream(input.getHdfsPath()); + String path = uploadFileFromStream(hdfsStream, input.getIsFirstRowHeader(),input.getInputFileType(),input.getTableName(), input.getDatabaseName()); + + JSONObject jo = new JSONObject(); + jo.put("uploadedPath", path); + + return Response.ok(jo).build(); + } catch (Exception e) { + LOG.error("Exception occurred while uploading the file from HDFS with path : " + input.getHdfsPath(), e); + throw new ServiceFormattedException(e.getMessage(), e); + } finally { + if (null != hdfsStream) + try { + hdfsStream.close(); + } catch (IOException e) { + LOG.error("Exception occured while closing the HDFS stream for path : " + input.getHdfsPath(), e); + } + } } + } - public Boolean getIsFirstRowHeader() { - return isFirstRowHeader; - } + @Path("/upload") + @POST + @Consumes(MediaType.MULTIPART_FORM_DATA) + @Produces(MediaType.APPLICATION_JSON) + public Response uploadFile( + @FormDataParam("file") InputStream uploadedInputStream, + @FormDataParam("file") FormDataContentDisposition fileDetail, + @FormDataParam("isFirstRowHeader") Boolean isFirstRowHeader, + @FormDataParam("inputFileType") String inputFileType, // the format of the file uploaded. CSV/JSON etc. + @FormDataParam("tableName") String tableName, + @FormDataParam("databaseName") String databaseName + ) { + try { - public void setIsFirstRowHeader(Boolean isFirstRowHeader) { - this.isFirstRowHeader = isFirstRowHeader; - } + String path = uploadFileFromStream(uploadedInputStream,isFirstRowHeader,inputFileType,tableName,databaseName); - public String getHeader() { - return header; + JSONObject jo = new JSONObject(); + jo.put("uploadedPath", path); + return Response.ok(jo).build(); + } catch (Exception e) { + throw new ServiceFormattedException(e.getMessage(), e); } + } - public void setHeader(String header) { - this.header = header; - } + @Path("/insertIntoTable") + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response insertFromTempTable(InsertFromQueryInput input) { + try { + String insertQuery = generateInsertFromQuery(input); + LOG.info("insertQuery : {}", insertQuery); + + Job job = createJob(insertQuery, "default"); - public String getTableName() { - return tableName; + JSONObject jo = new JSONObject(); + jo.put("jobId", job.getId()); + + return Response.ok(jo).build(); + } catch (Exception e) { + throw new ServiceFormattedException(e.getMessage(), e); } + } - public void setTableName(String tableName) { - this.tableName = tableName; + @Path("/deleteTable") + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response deleteTable(DeleteQueryInput input) { + try { + String deleteQuery = generateDeleteQuery(input); + LOG.info("deleteQuery : {}", deleteQuery); + + Job job = createJob(deleteQuery, "default"); + + JSONObject jo = new JSONObject(); + jo.put("jobId", job.getId()); + + return Response.ok(jo).build(); + } catch (Exception e) { + throw new ServiceFormattedException(e.getMessage(), e); } + } - public String getDatabaseName() { - return databaseName; + private String uploadIntoTable(Reader reader, String databaseName, String tempTableName) { + try { + String basePath = getHiveMetaStoreLocation(); + if (null == basePath) + basePath = "/apps/hive/warehouse"; + + if (!basePath.endsWith("/")) + basePath = basePath + "/"; + + if (databaseName != null && !databaseName.equals(HIVE_DEFAULT_DB)) { + basePath = basePath + databaseName + ".db/"; + } + + String fullPath = basePath + tempTableName + "/" + tempTableName + ".csv"; + + LOG.info("Uploading file into : {}", fullPath); + + uploadFile(fullPath, new ReaderInputStream(reader)); + + return fullPath; + } catch (Exception e) { + throw new ServiceFormattedException(e.getMessage(), e); } + } - public void setDatabaseName(String databaseName) { - this.databaseName = databaseName; + private synchronized JobResourceManager getResourceManager() { + if (resourceManager == null) { + SharedObjectsFactory connectionsFactory = getSharedObjectsFactory(); + resourceManager = new JobResourceManager(connectionsFactory, context); } + return resourceManager; } - @Path("/createTable") - @POST - @Consumes(MediaType.APPLICATION_JSON) - @Produces(MediaType.APPLICATION_JSON) - public Response createTable(TableInput tableInput) throws IllegalAccessException, InvocationTargetException, ItemNotFound, NoSuchMethodException { - String header = tableInput.getHeader(); - String databaseName = tableInput.getDatabaseName(); - String tableName = tableInput.getTableName(); - Boolean isFirstRowHeader = (Boolean) tableInput.getIsFirstRowHeader(); - - Object headerObj = JSONValue.parse(header); - JSONArray array = (JSONArray) headerObj; - List<ColumnDescription> cdList = new ArrayList<ColumnDescription>(array.size()); - for (Object o : array) { - JSONObject jo = (JSONObject) o; - String name = (String) jo.get("name"); - String type = (String) jo.get("type"); - Long p = (Long) jo.get("position"); - Integer position = p != null ? p.intValue() : 0; - - ColumnDescriptionImpl cdi = new ColumnDescriptionImpl(name, type, position); - cdList.add(cdi); + private synchronized AmbariApi getAmbariApi() { + if (null == ambariApi) { + ambariApi = new AmbariApi(this.context); } + return ambariApi; + } + + private String generateCreateQuery(TableInfo ti) { + return new QueryGenerator().generateCreateQuery(ti); + } + + private String generateInsertFromQuery(InsertFromQueryInput input) { + return new QueryGenerator().generateInsertFromQuery(input); + } + + private String generateDeleteQuery(DeleteQueryInput deleteQueryInput) { + return new QueryGenerator().generateDropTableQuery(deleteQueryInput); + } - Map jobInfo = new HashMap<String, String>();//PropertyUtils.describe(request.job); + private Job createJob(String query, String databaseName) throws InvocationTargetException, IllegalAccessException, ItemNotFound { + Map jobInfo = new HashMap<String, String>(); jobInfo.put("title", "Internal Table Creation"); - jobInfo.put("forcedContent", generateCreateQuery(databaseName, tableName, cdList)); + jobInfo.put("forcedContent", query); jobInfo.put("dataBase", databaseName); LOG.info("jobInfo : " + jobInfo); @@ -174,124 +326,90 @@ public class UploadService extends BaseService { createdJobController.submit(); getResourceManager().saveIfModified(createdJobController); - String filePath = (databaseName == null || databaseName.equals(HIVE_DEFAULT_DB)) ? "" : databaseName + ".db/"; - filePath += tableName + "/" + tableName + ".csv"; - - JSONObject jobObject = new JSONObject(); - jobObject.put("jobId", job.getId()); - jobObject.put("filePath", filePath); - - LOG.info("Create table query submitted : file should be uploaded at location : {}", filePath); - return Response.ok(jobObject).status(201).build(); + return job; } - @Path("/createTable/status") - @GET - @Consumes(MediaType.APPLICATION_JSON) - @Produces(MediaType.APPLICATION_JSON) - public Response isTableCreated(@QueryParam("jobId") int jobId) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException, ItemNotFound, HiveClientException, NoOperationStatusSetException { - JobController jobController = getResourceManager().readController(jobId + ""); - LOG.info("jobController.getStatus().status : {} for job : {}", jobController.getStatus().status, jobController.getJob().getId()); - JSONObject jsonObject = new JSONObject(); - jsonObject.put("status", jobController.getStatus().status); - return Response.ok(jsonObject).build(); + private String getHiveMetaStoreLocation() { + return this.getAmbariApi().getCluster().getConfigurationValue(HIVE_SITE, HIVE_META_STORE_LOCATION_KEY); } - @Path("/upload") - @POST - @Consumes(MediaType.MULTIPART_FORM_DATA) - public Response uploadFile( - @FormDataParam("file") InputStream uploadedInputStream, - @FormDataParam("file") FormDataContentDisposition fileDetail, - @FormDataParam("isFirstRowHeader") Boolean isFirstRowHeader, - @FormDataParam("filePath") String filePath + private void uploadFile(final String filePath, InputStream uploadedInputStream) + throws IOException, InterruptedException { + byte[] chunk = new byte[1024]; + FSDataOutputStream out = getSharedObjectsFactory().getHdfsApi().create(filePath, false); + int n = -1; + while ((n = uploadedInputStream.read(chunk)) != -1) { + out.write(chunk, 0, n); + } + out.close(); + } - ) throws IOException, InterruptedException { - LOG.info("inside uploadFile : isFirstRowHeader : {} , filePath : {}", isFirstRowHeader, filePath); -/* This is not working as expected. + private PreviewData generatePreview(Boolean isFirstRowHeader, String inputFileType, InputStream uploadedInputStream) throws IOException { ParseOptions parseOptions = new ParseOptions(); - parseOptions.setOption(ParseOptions.OPTIONS_FILE_TYPE, ParseOptions.FILE_TYPE_CSV); - parseOptions.setOption(ParseOptions.HEADERS,cdList); - - if(isFirstRowHeader) - parseOptions.setOption(ParseOptions.OPTIONS_HEADER,ParseOptions.HEADER_FIRST_RECORD); + parseOptions.setOption(ParseOptions.OPTIONS_FILE_TYPE, inputFileType); + if (inputFileType.equals(ParseOptions.InputFileType.CSV.toString()) && !isFirstRowHeader) + parseOptions.setOption(ParseOptions.OPTIONS_HEADER, ParseOptions.HEADER.PROVIDED_BY_USER.toString()); else - parseOptions.setOption(ParseOptions.OPTIONS_HEADER,ParseOptions.HEADER_PROVIDED_BY_USER); - - DataParser dataParser = new DataParser(new InputStreamReader(uploadedInputStream),parseOptions); - - // remove first row if it is header and send the rest to HDFS - if(isFirstRowHeader){ - if( dataParser.iterator().hasNext() ){ - dataParser.iterator().next(); - } - } + parseOptions.setOption(ParseOptions.OPTIONS_HEADER, ParseOptions.HEADER.FIRST_RECORD.toString()); - Reader csvReader = dataParser.getCSVReader(); -*/ + LOG.info("isFirstRowHeader : {}, inputFileType : {}", isFirstRowHeader, inputFileType); - // TODO : workaround alert as above method is not working properly - // remove first row if it is header and send the rest to HDFS - Reader r = new InputStreamReader(uploadedInputStream); - if (isFirstRowHeader) { - BufferedReader br = new BufferedReader(r, 1); // - br.readLine(); // TODO : remove the header line. Wrong if first record is beyond first endline - } + DataParser dataParser = new DataParser(new InputStreamReader(uploadedInputStream), parseOptions); - String basePath = getHiveMetaStoreLocation(); - if (null == basePath) - basePath = "/apps/hive/warehouse"; + return dataParser.parsePreview(); - if (!basePath.endsWith("/")) - basePath = basePath + "/"; + } - String fullPath = basePath + filePath; + private Response createPreviewResponse(PreviewData pd, Boolean isFirstRowHeader, String tableName) { - uploadTable(new ReaderInputStream(r), fullPath); + Map<String, Object> retData = new HashMap<String, Object>(); + retData.put("header", pd.getHeader()); + retData.put("rows", pd.getPreviewRows()); + retData.put("isFirstRowHeader", isFirstRowHeader); + retData.put("tableName", tableName); - LOG.info("create the table successfully at : {}", fullPath); - return Response.ok().build(); + JSONObject jsonObject = new JSONObject(retData); + return Response.ok(jsonObject).build(); } - private String getHiveMetaStoreLocation() { - return this.getAmbariApi().getCluster().getConfigurationValue(HIVE_SITE, HIVE_META_STORE_LOCATION_KEY); + private InputStream getHDFSFileStream(String path) throws IOException, InterruptedException { + FSDataInputStream fsStream = getSharedObjectsFactory().getHdfsApi().open(path); + return fsStream; } - private void uploadTable(InputStream is, String path) throws IOException, InterruptedException { - if (!path.endsWith("/")) { - path = path + "/"; - } + private String uploadFileFromStream( + InputStream uploadedInputStream, + Boolean isFirstRowHeader, + String inputFileType, // the format of the file uploaded. CSV/JSON etc. + String tableName, + String databaseName - uploadFile(path, is); - } + ) throws IOException { + LOG.info(" uploading file into databaseName {}, tableName {}", databaseName, tableName); + ParseOptions parseOptions = new ParseOptions(); + parseOptions.setOption(ParseOptions.OPTIONS_FILE_TYPE, inputFileType); - private void uploadFile(final String filePath, InputStream uploadedInputStream) - throws IOException, InterruptedException { - byte[] chunk = new byte[1024]; - FSDataOutputStream out = getSharedObjectsFactory().getHdfsApi().create(filePath, false); - while (uploadedInputStream.read(chunk) != -1) { - out.write(chunk); - } - out.close(); - } + DataParser dataParser = new DataParser(new InputStreamReader(uploadedInputStream), parseOptions); + if (inputFileType.equals(ParseOptions.InputFileType.CSV.toString()) && isFirstRowHeader) + dataParser.extractHeader(); // removes the header line if any from the stream - protected synchronized JobResourceManager getResourceManager() { - if (resourceManager == null) { - SharedObjectsFactory connectionsFactory = getSharedObjectsFactory(); - resourceManager = new JobResourceManager(connectionsFactory, context); - } - return resourceManager; + Reader csvReader = dataParser.getTableDataReader(); + String path = uploadIntoTable(csvReader, databaseName, tableName); + return path; } - protected synchronized AmbariApi getAmbariApi() { - if (null == ambariApi) { - ambariApi = new AmbariApi(this.context); - } - return ambariApi; + private String getBasenameFromPath(String path) { + String fileName = new File(path).getName(); + return getBasename(fileName); } - private String generateCreateQuery(String databaseName, String tableName, List<ColumnDescription> cdList) { - return new QueryGenerator().generateCreateQuery(new TableInfo(databaseName, tableName, cdList)); + private String getBasename(String fileName){ + int index = fileName.indexOf("."); + if(index != -1){ + return fileName.substring(0,index); + } + + return fileName; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/DataParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/DataParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/DataParser.java new file mode 100644 index 0000000..7eae679 --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/DataParser.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.uploads.parsers; + +import org.apache.ambari.view.hive.client.Row; +import org.apache.ambari.view.hive.resources.uploads.parsers.csv.CSVParser; +import org.apache.ambari.view.hive.resources.uploads.parsers.json.JSONParser; +import org.apache.ambari.view.hive.resources.uploads.parsers.xml.XMLParser; + +import java.io.IOException; +import java.io.Reader; +import java.util.Iterator; + +/** + * Wrapper/Decorator over the Stream parsers. + * Supports XML/JSON/CSV parsing. + */ +public class DataParser implements IParser { + + private IParser parser; + + public DataParser(Reader reader, ParseOptions parseOptions) throws IOException { + if (parseOptions.getOption(ParseOptions.OPTIONS_FILE_TYPE).equals(ParseOptions.InputFileType.CSV.toString())) { + parser = new CSVParser(reader, parseOptions); + } else if (parseOptions.getOption(ParseOptions.OPTIONS_FILE_TYPE).equals(ParseOptions.InputFileType.JSON.toString())) { + parser = new JSONParser(reader, parseOptions); + } else if (parseOptions.getOption(ParseOptions.OPTIONS_FILE_TYPE).equals(ParseOptions.InputFileType.XML.toString())) { + parser = new XMLParser(reader, parseOptions); + } + } + + @Override + public Reader getTableDataReader() { + return parser.getTableDataReader(); + } + + @Override + public PreviewData parsePreview() { + return parser.parsePreview(); + } + + @Override + public Row extractHeader() { + return parser.extractHeader(); + } + + @Override + public void close() throws IOException { + parser.close(); + } + + @Override + public Iterator<Row> iterator() { + return parser.iterator(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/EndOfDocumentException.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/EndOfDocumentException.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/EndOfDocumentException.java new file mode 100644 index 0000000..6bbe303 --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/EndOfDocumentException.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.uploads.parsers; + + +public class EndOfDocumentException extends Exception { + public EndOfDocumentException() { + } + + public EndOfDocumentException(String message) { + super(message); + } + + public EndOfDocumentException(String message, Throwable cause) { + super(message, cause); + } + + public EndOfDocumentException(Throwable cause) { + super(cause); + } + + public EndOfDocumentException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/IParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/IParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/IParser.java new file mode 100644 index 0000000..6056e73 --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/IParser.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.uploads.parsers; + +import org.apache.ambari.view.hive.client.ColumnDescription; +import org.apache.ambari.view.hive.client.Row; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.util.List; + +/** + * Interface defining methods for Parsers that can used for generating preview + * and uploading table into hive. + */ +public interface IParser extends Iterable<Row> { + + /** + * @return returns the Reader that can be read to get the table data as CSV Text Data that can be uploaded directly + * to HDFS + */ + Reader getTableDataReader(); + + PreviewData parsePreview(); + + Row extractHeader(); + + void close() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/ParseOptions.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/ParseOptions.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/ParseOptions.java new file mode 100644 index 0000000..adbdf7f --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/ParseOptions.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.uploads.parsers; + +import java.util.HashMap; + +public class ParseOptions { + public enum InputFileType { + CSV, + JSON, + XML + } + + public enum HEADER { + FIRST_RECORD, + PROVIDED_BY_USER + } + final public static String OPTIONS_FILE_TYPE = "FILE_TYPE"; + final public static String OPTIONS_HEADER = "HEADER"; + final public static String OPTIONS_NUMBER_OF_PREVIEW_ROWS = "NUMBER_OF_PREVIEW_ROWS"; + + private HashMap<String, Object> options = new HashMap<>(); + + public void setOption(String key, Object value) { + this.options.put(key, value); + } + + public Object getOption(String key) { + return this.options.get(key); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/ParseUtils.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/ParseUtils.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/ParseUtils.java new file mode 100644 index 0000000..a17346f --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/ParseUtils.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.uploads.parsers; + +import org.apache.ambari.view.hive.client.ColumnDescription; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; + +public class ParseUtils { + + final public static String[] DATE_FORMATS = {"mm/dd/yyyy", "dd/mm/yyyy", "mm-dd-yyyy" /*add more formatss*/}; + + public static boolean isInteger(Object object) { + if (object == null) + return false; + + if (object instanceof Integer) + return true; + + try { + Integer i = Integer.parseInt(object.toString()); + return true; + } catch (NumberFormatException nfe) { + return false; + } + } + + public static boolean isBoolean(Object object) { + if (object == null) + return false; + + if (object instanceof Boolean) + return true; + + String strValue = object.toString(); + if (strValue.equalsIgnoreCase("true") || strValue.equalsIgnoreCase("false")) + return true; + else + return false; + } + + public static boolean isLong(Object object) { + if (object == null) + return false; + + if (object instanceof Long) + return true; + + try { + Long i = Long.parseLong(object.toString()); + return true; + } catch (Exception nfe) { + return false; + } + } + + public static boolean isDouble(Object object) { + if (object == null) + return false; + + if (object instanceof Double) + return true; + + try { + Double i = Double.parseDouble(object.toString()); + return true; + } catch (Exception nfe) { + return false; + } + } + + public static boolean isChar(Object object) { + if (object == null) + return false; + + if (object instanceof Character) + return true; + + String str = object.toString().trim(); + if (str.length() == 1) + return true; + + return false; + } + + public static boolean isDate(Object object) { + if (object == null) + return false; + + if (object instanceof Date) + return true; + + String str = object.toString(); + for (String format : DATE_FORMATS) { + try { + Date i = new SimpleDateFormat(format).parse(str); + return true; + } catch (Exception e) { + } + } + + return false; + } + + public static ColumnDescription.DataTypes detectHiveDataType(Object object) { + // detect Integer + if (isInteger(object)) return ColumnDescription.DataTypes.INT; + if (isLong(object)) return ColumnDescription.DataTypes.BIGINT; + if (isBoolean(object)) return ColumnDescription.DataTypes.BOOLEAN; + if (isDouble(object)) return ColumnDescription.DataTypes.DOUBLE; + if (isDate(object)) return ColumnDescription.DataTypes.DATE; + if (isChar(object)) return ColumnDescription.DataTypes.CHAR; + + return ColumnDescription.DataTypes.STRING; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/0747b6c7/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/Parser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/Parser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/Parser.java new file mode 100644 index 0000000..d94ad3c --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/Parser.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.uploads.parsers; + +import org.apache.ambari.view.hive.client.ColumnDescription; +import org.apache.ambari.view.hive.client.Row; +import org.apache.ambari.view.hive.resources.uploads.ColumnDescriptionImpl; +import org.apache.ambari.view.hive.resources.uploads.TableDataReader; + +import java.io.Reader; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * provides general implementation for parsing JSON,CSV,XML file + * to generate preview rows, headers and column types + * also provides TableDataReader for converting any type to CSV. + */ +public abstract class Parser implements IParser { + + protected Reader reader; // same as CSV reader in this case + protected ParseOptions parseOptions; + private int numberOfPreviewRows = 10; + + public Parser(Reader originalReader, ParseOptions parseOptions) { + this.reader = originalReader; + this.parseOptions = parseOptions; + } + + /** + * returns which datatype was detected for the maximum number of times in the given column + * + * @param typeCounts + * @param colNum + * @return + */ + private int getLikelyDataType(int[][] typeCounts, int colNum) { + int[] colArray = typeCounts[colNum]; + int maxIndex = 0; + int i = 1; + for (; i < colArray.length; i++) { + if (colArray[i] > colArray[maxIndex]) + maxIndex = i; + } + + return maxIndex; + } + + @Override + public Reader getTableDataReader() { + return new TableDataReader(this.iterator()); + } + + @Override + public PreviewData parsePreview() { + List<Row> previewRows; + List<ColumnDescription> header; + + try { + numberOfPreviewRows = (Integer) parseOptions.getOption(ParseOptions.OPTIONS_NUMBER_OF_PREVIEW_ROWS); + } catch (Exception e) { + } + + int numberOfRows = numberOfPreviewRows; + previewRows = new ArrayList<Row>(numberOfPreviewRows + 1); // size including the header. + + Row headerRow = null; + Integer numOfCols = null; + int[][] typeCounts = null; + + if (parseOptions.getOption(ParseOptions.OPTIONS_HEADER) != null && parseOptions.getOption(ParseOptions.OPTIONS_HEADER).equals(ParseOptions.HEADER.FIRST_RECORD.toString())) { + if (!this.iterator().hasNext()) { + throw new NoSuchElementException("Cannot parse Header"); + } + headerRow = extractHeader(); + numOfCols = headerRow.getRow().length; + typeCounts = new int[numOfCols][ColumnDescription.DataTypes.values().length]; + previewRows.add(headerRow); + } + + // find data types. + + Row r; + if (iterator().hasNext()) { + r = iterator().next(); + if( null == numOfCols ) { + numOfCols = r.getRow().length; + typeCounts = new int[numOfCols][ColumnDescription.DataTypes.values().length]; + } + } else { + throw new NoSuchElementException("No rows in the file."); + } + + while (true) { + // create Header definition from row + Object[] values = r.getRow(); + + Object[] newValues= new Object[numOfCols]; // adds null if less columns detected and removes extra columns if any + + for (int colNum = 0; colNum < numOfCols; colNum++) { + if(colNum < values.length) { + // detect type + ColumnDescription.DataTypes type = ParseUtils.detectHiveDataType(values[colNum]); + typeCounts[colNum][type.ordinal()]++; + newValues[colNum] = values[colNum]; + }else{ + newValues[colNum] = null; + } + } + + previewRows.add(new Row(newValues)); + + numberOfRows--; + if (numberOfRows <= 0 || !iterator().hasNext()) + break; + + r = iterator().next(); + } + + if (previewRows.size() <= 0) + throw new NoSuchElementException("Does not contain any rows."); + + header = new ArrayList<>(numOfCols); + for (int colNum = 0; colNum < numOfCols; colNum++) { + int dataTypeId = getLikelyDataType(typeCounts, colNum); + ColumnDescription.DataTypes type = ColumnDescription.DataTypes.values()[dataTypeId]; + String colName = "Column" + colNum; + if (null != headerRow) + colName = (String) headerRow.getRow()[colNum]; + + ColumnDescription cd = new ColumnDescriptionImpl(colName, type.toString(), colNum); + header.add(cd); + } + + return new PreviewData(header,previewRows); + } +}
