AMBARI-17421 adding opencsv parser, added support for delimiters and endline characters in values. (Nitiraj Ratjore via pallavkul)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d824786a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d824786a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d824786a Branch: refs/heads/branch-2.4 Commit: d824786a2d36e2b07e088e925f33becd7e488cc6 Parents: b270818 Author: Pallav Kulshreshtha <[email protected]> Authored: Thu Jun 30 17:14:41 2016 +0530 Committer: Pallav Kulshreshtha <[email protected]> Committed: Thu Jun 30 17:16:51 2016 +0530 ---------------------------------------------------------------------- contrib/views/hive/pom.xml | 6 +- .../view/hive/resources/jobs/Aggregator.java | 2 +- .../view/hive/resources/jobs/JobService.java | 1 - .../hive/resources/jobs/viewJobs/JobImpl.java | 11 + .../view/hive/resources/uploads/CSVParams.java | 74 +++++ .../hive/resources/uploads/TableDataReader.java | 37 ++- .../view/hive/resources/uploads/TableInput.java | 45 +-- .../resources/uploads/UploadFromHdfsInput.java | 51 ++- .../hive/resources/uploads/UploadService.java | 190 ++++++----- .../resources/uploads/parsers/DataParser.java | 10 +- .../hive/resources/uploads/parsers/IParser.java | 13 - .../resources/uploads/parsers/ParseOptions.java | 12 + .../hive/resources/uploads/parsers/Parser.java | 11 +- .../resources/uploads/parsers/RowIterator.java | 6 +- .../uploads/parsers/csv/CSVIterator.java | 57 ---- .../uploads/parsers/csv/CSVParser.java | 55 ---- .../parsers/csv/commonscsv/CSVIterator.java | 57 ++++ .../parsers/csv/commonscsv/CSVParser.java | 88 ++++++ .../parsers/csv/opencsv/OpenCSVIterator.java | 56 ++++ .../parsers/csv/opencsv/OpenCSVParser.java | 92 ++++++ .../uploads/query/InsertFromQueryInput.java | 26 +- .../resources/uploads/query/QueryGenerator.java | 84 +++-- .../hive/resources/uploads/query/RowFormat.java | 57 ++++ .../hive/resources/uploads/query/TableInfo.java | 33 +- .../ui/hive-web/app/controllers/upload-table.js | 303 ++++++++++++++---- .../ui/hive-web/app/initializers/i18n.js | 27 +- .../resources/ui/hive-web/app/styles/app.scss | 10 + .../ui/hive-web/app/templates/upload-table.hbs | 193 +++++++++--- .../hive/resources/upload/CSVParserTest.java | 179 ++++++++++- .../resources/upload/DataParserCSVTest.java | 9 +- .../resources/upload/DataParserJSONTest.java | 4 +- .../resources/upload/DataParserXMLTest.java | 4 +- .../resources/upload/OpenCSVParserTest.java | 313 +++++++++++++++++++ .../view/hive/resources/upload/OpenCSVTest.java | 248 +++++++++++++++ .../resources/upload/QueryGeneratorTest.java | 40 ++- .../resources/upload/TableDataReaderTest.java | 30 +- 36 files changed, 1970 insertions(+), 464 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/views/hive/pom.xml b/contrib/views/hive/pom.xml index 3a453bc..9d7db99 100644 --- a/contrib/views/hive/pom.xml +++ b/contrib/views/hive/pom.xml @@ -69,9 +69,9 @@ <version>1.6</version> </dependency> <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-csv</artifactId> - <version>1.0</version> + <groupId>com.opencsv</groupId> + <artifactId>opencsv</artifactId> + <version>3.8</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/Aggregator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/Aggregator.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/Aggregator.java index 5164a4d..2f0138a 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/Aggregator.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/Aggregator.java @@ -239,7 +239,7 @@ public class Aggregator { for (HiveQueryId hqid : queries) { operationIdVsHiveId.put(hqid.operationId, hqid.entity); } - LOG.info("operationIdVsHiveId : {} ", operationIdVsHiveId); + LOG.debug("operationIdVsHiveId : {} ", operationIdVsHiveId); //cover case when operationId is present, but not exists in ATS //e.g. optimized queries without executing jobs, like "SELECT * FROM TABLE" List<Job> jobs = viewJobResourceManager.readAll(new OnlyOwnersFilteringStrategy(username)); http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java index a540ca0..36c2633 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java @@ -435,7 +435,6 @@ public class JobService extends BaseService { job.setSessionTag(null); } - LOG.info("allJobs : {}", allJobs); return allJobs; } catch (WebApplicationException ex) { LOG.error("Exception occured while fetching all jobs.", ex); http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobImpl.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobImpl.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobImpl.java index 2e5f0f7..c099cae 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobImpl.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobImpl.java @@ -309,4 +309,15 @@ public class JobImpl implements Job { public void setGlobalSettings(String globalSettings) { this.globalSettings = globalSettings; } + + @Override + public String toString() { + return new StringBuilder("JobImpl{") + .append("id='").append(id) + .append(", owner='").append(owner) + .append(", hiveQueryId='").append(hiveQueryId) + .append(", dagId='").append(dagId) + .append(", queryId='").append(queryId) + .append('}').toString(); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/CSVParams.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/CSVParams.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/CSVParams.java new file mode 100644 index 0000000..355ed6a --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/CSVParams.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.uploads; + +import java.io.Serializable; + +public class CSVParams implements Serializable { + + public static final char DEFAULT_DELIMITER_CHAR = ','; + public static final char DEFAULT_ESCAPE_CHAR = '\\'; + public static final char DEFAULT_QUOTE_CHAR = '"'; + + private Character csvDelimiter; + private Character csvEscape; + private Character csvQuote; + + public CSVParams() { + } + + public CSVParams(Character csvDelimiter, Character csvQuote, Character csvEscape) { + this.csvDelimiter = csvDelimiter; + this.csvQuote = csvQuote; + this.csvEscape = csvEscape; + } + + public Character getCsvDelimiter() { + return csvDelimiter; + } + + public void setCsvDelimiter(Character csvDelimiter) { + this.csvDelimiter = csvDelimiter; + } + + public Character getCsvEscape() { + return csvEscape; + } + + public void setCsvEscape(Character csvEscape) { + this.csvEscape = csvEscape; + } + + public Character getCsvQuote() { + return csvQuote; + } + + public void setCsvQuote(Character csvQuote) { + this.csvQuote = csvQuote; + } + + @Override + public String toString() { + return "CSVParams{" + + "csvDelimiter='" + csvDelimiter + '\'' + + ", csvEscape='" + csvEscape + '\'' + + ", csvQuote='" + csvQuote + '\'' + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableDataReader.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableDataReader.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableDataReader.java index e9bdb92..7725719 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableDataReader.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableDataReader.java @@ -18,15 +18,17 @@ package org.apache.ambari.view.hive.resources.uploads; +import com.opencsv.CSVWriter; +import org.apache.ambari.view.hive.client.ColumnDescription; import org.apache.ambari.view.hive.client.Row; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVPrinter; +import org.apache.commons.codec.binary.Hex; import java.io.IOException; import java.io.Reader; import java.io.StringReader; import java.io.StringWriter; import java.util.Iterator; +import java.util.List; /** * Takes row iterator as input. @@ -36,13 +38,17 @@ import java.util.Iterator; public class TableDataReader extends Reader { private static final int CAPACITY = 1024; + private final List<ColumnDescriptionImpl> header; private StringReader stringReader = new StringReader(""); private Iterator<Row> iterator; - private static final CSVFormat CSV_FORMAT = CSVFormat.DEFAULT.withRecordSeparator("\n"); + private boolean encode = false; + public static final char CSV_DELIMITER = '\001'; - public TableDataReader(Iterator<Row> rowIterator) { + public TableDataReader(Iterator<Row> rowIterator, List<ColumnDescriptionImpl> header, boolean encode) { this.iterator = rowIterator; + this.encode = encode; + this.header = header; } @Override @@ -64,9 +70,28 @@ public class TableDataReader extends Reader { if (iterator.hasNext()) { // keep reading as long as we keep getting rows StringWriter stringWriter = new StringWriter(CAPACITY); - CSVPrinter csvPrinter = new CSVPrinter(stringWriter, CSV_FORMAT); + CSVWriter csvPrinter = new CSVWriter(stringWriter,CSV_DELIMITER); Row row = iterator.next(); - csvPrinter.printRecord(row.getRow()); + // encode values so that \n and \r are overridden + Object[] columnValues = row.getRow(); + String[] columns = new String[columnValues.length]; + + for(int i = 0; i < columnValues.length; i++){ + String type = header.get(i).getType(); + if(this.encode && + ( + ColumnDescription.DataTypes.STRING.toString().equals(type) + || ColumnDescription.DataTypes.VARCHAR.toString().equals(type) + || ColumnDescription.DataTypes.CHAR.toString().equals(type) + ) + ){ + columns[i] = Hex.encodeHexString(((String)columnValues[i]).getBytes()); //default charset + }else { + columns[i] = (String) columnValues[i]; + } + } + + csvPrinter.writeNext(columns,false); stringReader.close(); // close the old string reader stringReader = new StringReader(stringWriter.getBuffer().toString()); csvPrinter.close(); http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableInput.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableInput.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableInput.java index f7fbbba..1d5adf4 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableInput.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/TableInput.java @@ -18,20 +18,13 @@ package org.apache.ambari.view.hive.resources.uploads; -import java.util.List; +import org.apache.ambari.view.hive.resources.uploads.query.TableInfo; /** * used as input in REST call */ -class TableInput { +class TableInput extends TableInfo { public Boolean isFirstRowHeader = Boolean.FALSE; - public List<ColumnDescriptionImpl> header; - public String tableName; - public String databaseName; - /** - * the format of the file created for the table inside hive : ORC TEXTFILE etc. - */ - public String fileType; public TableInput() { } @@ -44,40 +37,8 @@ class TableInput { this.isFirstRowHeader = isFirstRowHeader; } - public List<ColumnDescriptionImpl> getHeader() { - return header; - } - - public void setHeader(List<ColumnDescriptionImpl> header) { - this.header = header; - } - - public String getTableName() { - return tableName; - } - - public void setTableName(String tableName) { - this.tableName = tableName; - } - - public String getDatabaseName() { - return databaseName; - } - - public void setDatabaseName(String databaseName) { - this.databaseName = databaseName; - } - - public String getFileType() { - return fileType; - } - - public void setFileType(String fileType) { - this.fileType = fileType; - } - public void validate(){ - if( null == this.getFileType()){ + if( null == this.getHiveFileType()){ throw new IllegalArgumentException("fileType parameter cannot be null."); } if( null == this.getTableName()){ http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/UploadFromHdfsInput.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/UploadFromHdfsInput.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/UploadFromHdfsInput.java index 14bd27a..af20aff 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/UploadFromHdfsInput.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/UploadFromHdfsInput.java @@ -19,6 +19,7 @@ package org.apache.ambari.view.hive.resources.uploads; import java.io.Serializable; +import java.util.List; public class UploadFromHdfsInput implements Serializable{ private Boolean isFirstRowHeader = Boolean.FALSE; @@ -26,16 +27,54 @@ public class UploadFromHdfsInput implements Serializable{ private String hdfsPath; private String tableName; private String databaseName; + private List<ColumnDescriptionImpl> header; + private boolean containsEndlines; + + private String csvDelimiter; + private String csvEscape; + private String csvQuote; public UploadFromHdfsInput() { } - public UploadFromHdfsInput(Boolean isFirstRowHeader, String inputFileType, String hdfsPath, String tableName, String databaseName) { - this.isFirstRowHeader = isFirstRowHeader; - this.inputFileType = inputFileType; - this.hdfsPath = hdfsPath; - this.tableName = tableName; - this.databaseName = databaseName; + public String getCsvDelimiter() { + return csvDelimiter; + } + + public List<ColumnDescriptionImpl> getHeader() { + return header; + } + + public void setHeader(List<ColumnDescriptionImpl> header) { + this.header = header; + } + + public boolean isContainsEndlines() { + return containsEndlines; + } + + public void setContainsEndlines(boolean containsEndlines) { + this.containsEndlines = containsEndlines; + } + + public void setCsvDelimiter(String csvDelimiter) { + this.csvDelimiter = csvDelimiter; + } + + public String getCsvEscape() { + return csvEscape; + } + + public void setCsvEscape(String csvEscape) { + this.csvEscape = csvEscape; + } + + public String getCsvQuote() { + return csvQuote; + } + + public void setCsvQuote(String csvQuote) { + this.csvQuote = csvQuote; } public Boolean getIsFirstRowHeader() { http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/UploadService.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/UploadService.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/UploadService.java index ad10751..a83d17d 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/UploadService.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/UploadService.java @@ -29,24 +29,38 @@ import org.apache.ambari.view.hive.resources.jobs.viewJobs.JobResourceManager; import org.apache.ambari.view.hive.resources.uploads.parsers.DataParser; import org.apache.ambari.view.hive.resources.uploads.parsers.ParseOptions; import org.apache.ambari.view.hive.resources.uploads.parsers.PreviewData; -import org.apache.ambari.view.hive.resources.uploads.query.*; -import org.apache.ambari.view.hive.utils.HiveClientFormattedException; +import org.apache.ambari.view.hive.resources.uploads.query.DeleteQueryInput; +import org.apache.ambari.view.hive.resources.uploads.query.InsertFromQueryInput; +import org.apache.ambari.view.hive.resources.uploads.query.QueryGenerator; +import org.apache.ambari.view.hive.resources.uploads.query.TableInfo; import org.apache.ambari.view.hive.utils.ServiceFormattedException; import org.apache.ambari.view.hive.utils.SharedObjectsFactory; import org.apache.ambari.view.utils.ambari.AmbariApi; import org.apache.commons.io.input.ReaderInputStream; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.*; +import javax.ws.rs.Consumes; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import java.io.*; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; import java.lang.reflect.InvocationTargetException; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * UI driven end points for creation of new hive table and inserting data into it. @@ -117,7 +131,8 @@ public class UploadService extends BaseService { try { uploadedInputStream = getHDFSFileStream(input.getHdfsPath()); this.validateForPreview(input); - PreviewData pd = generatePreview(input.getIsFirstRowHeader(), input.getInputFileType(), uploadedInputStream); + CSVParams csvParams = getCsvParams(input.getCsvDelimiter(), input.getCsvQuote(), input.getCsvEscape()); + PreviewData pd = generatePreview(input.getIsFirstRowHeader(), input.getInputFileType(), csvParams, uploadedInputStream); String tableName = getBasenameFromPath(input.getHdfsPath()); return createPreviewResponse(pd, input.getIsFirstRowHeader(), tableName); } catch (WebApplicationException e) { @@ -144,7 +159,10 @@ public class UploadService extends BaseService { @FormDataParam("file") InputStream uploadedInputStream, @FormDataParam("file") FormDataContentDisposition fileDetail, @FormDataParam("isFirstRowHeader") Boolean isFirstRowHeader, - @FormDataParam("inputFileType") String inputFileType + @FormDataParam("inputFileType") String inputFileType, + @FormDataParam("csvDelimiter") String csvDelimiter, + @FormDataParam("csvEscape") String csvEscape, + @FormDataParam("csvQuote") String csvQuote ) { try { if( null == inputFileType) @@ -153,7 +171,9 @@ public class UploadService extends BaseService { if( null == isFirstRowHeader ) isFirstRowHeader = false; - PreviewData pd = generatePreview(isFirstRowHeader, inputFileType, uploadedInputStream); + CSVParams csvParams = getCsvParams(csvDelimiter, csvQuote, csvEscape); + + PreviewData pd = generatePreview(isFirstRowHeader, inputFileType, csvParams, uploadedInputStream); return createPreviewResponse(pd, isFirstRowHeader, getBasename(fileDetail.getFileName())); } catch (WebApplicationException e) { LOG.error(getErrorMessage(e), e); @@ -164,6 +184,35 @@ public class UploadService extends BaseService { } } + private CSVParams getCsvParams(String csvDelimiter, String csvQuote, String csvEscape) { + char csvq = CSVParams.DEFAULT_QUOTE_CHAR; + char csvd = CSVParams.DEFAULT_DELIMITER_CHAR; + char csve = CSVParams.DEFAULT_ESCAPE_CHAR; + + if(null != csvDelimiter){ + char[] csvdArray = csvDelimiter.toCharArray(); + if(csvdArray.length > 0 ) { + csvd = csvdArray[0]; + } + } + + if(null != csvQuote){ + char[] csvqArray = csvQuote.toCharArray(); + if(csvqArray.length > 0 ) { + csvq = csvqArray[0]; + } + } + + if(null != csvEscape){ + char[] csveArray = csvEscape.toCharArray(); + if(csveArray.length > 0 ) { + csve = csveArray[0]; + } + } + + return new CSVParams(csvd, csvq, csve); + } + @Path("/createTable") @POST @@ -172,15 +221,8 @@ public class UploadService extends BaseService { public Job createTable(TableInput tableInput) { try { tableInput.validate(); - List<ColumnDescriptionImpl> header = tableInput.getHeader(); String databaseName = tableInput.getDatabaseName(); - String tableName = tableInput.getTableName(); - Boolean isFirstRowHeader = tableInput.getIsFirstRowHeader(); - String fileTypeStr = tableInput.getFileType(); - HiveFileType hiveFileType = HiveFileType.valueOf(fileTypeStr); - - TableInfo ti = new TableInfo(databaseName, tableName, header, hiveFileType); - String tableCreationQuery = generateCreateQuery(ti); + String tableCreationQuery = generateCreateQuery(tableInput); LOG.info("tableCreationQuery : {}", tableCreationQuery); Job job = createJob(tableCreationQuery, databaseName); @@ -200,48 +242,30 @@ public class UploadService extends BaseService { @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) public Response uploadFileFromHdfs(UploadFromHdfsInput input) { - if (ParseOptions.InputFileType.CSV.toString().equals(input.getInputFileType()) && input.getIsFirstRowHeader().equals(Boolean.FALSE)) { - try { - // upload using the LOAD query - LoadQueryInput loadQueryInput = new LoadQueryInput(input.getHdfsPath(), input.getDatabaseName(), input.getTableName()); - String loadQuery = new QueryGenerator().generateLoadQuery(loadQueryInput); - Job job = createJob(loadQuery, input.getDatabaseName()); - - JSONObject jo = new JSONObject(); - jo.put("jobId", job.getId()); - return Response.ok(jo).build(); - } catch (WebApplicationException e) { - LOG.error(getErrorMessage(e), e); - throw e; - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new ServiceFormattedException(e); - } - } else { - // create stream and upload - InputStream hdfsStream = null; - try { - hdfsStream = getHDFSFileStream(input.getHdfsPath()); - String path = uploadFileFromStream(hdfsStream, input.getIsFirstRowHeader(), input.getInputFileType(), input.getTableName(), input.getDatabaseName()); - - JSONObject jo = new JSONObject(); - jo.put("uploadedPath", path); - - return Response.ok(jo).build(); - } catch (WebApplicationException e) { - LOG.error(getErrorMessage(e), e); - throw e; - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new ServiceFormattedException(e); - } finally { - if (null != hdfsStream) - try { - hdfsStream.close(); - } catch (IOException e) { - LOG.error("Exception occured while closing the HDFS stream for path : " + input.getHdfsPath(), e); - } - } + // create stream and upload + InputStream hdfsStream = null; + try { + hdfsStream = getHDFSFileStream(input.getHdfsPath()); + CSVParams csvParams = getCsvParams(input.getCsvDelimiter(), input.getCsvQuote(), input.getCsvEscape()); + String path = uploadFileFromStream(hdfsStream, input.getIsFirstRowHeader(), input.getInputFileType(), input.getTableName(), input.getDatabaseName(), input.getHeader(), input.isContainsEndlines(), csvParams); + + JSONObject jo = new JSONObject(); + jo.put("uploadedPath", path); + + return Response.ok(jo).build(); + } catch (WebApplicationException e) { + LOG.error(getErrorMessage(e), e); + throw e; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new ServiceFormattedException(e); + } finally { + if (null != hdfsStream) + try { + hdfsStream.close(); + } catch (IOException e) { + LOG.error("Exception occured while closing the HDFS stream for path : " + input.getHdfsPath(), e); + } } } @@ -255,10 +279,19 @@ public class UploadService extends BaseService { @FormDataParam("isFirstRowHeader") Boolean isFirstRowHeader, @FormDataParam("inputFileType") String inputFileType, // the format of the file uploaded. CSV/JSON etc. @FormDataParam("tableName") String tableName, - @FormDataParam("databaseName") String databaseName + @FormDataParam("databaseName") String databaseName, + @FormDataParam("header") String header, + @FormDataParam("containsEndlines") boolean containsEndlines, + @FormDataParam("csvDelimiter") String csvDelimiter, + @FormDataParam("csvEscape") String csvEscape, + @FormDataParam("csvQuote") String csvQuote + ) { try { - String path = uploadFileFromStream(uploadedInputStream, isFirstRowHeader, inputFileType, tableName, databaseName); + CSVParams csvParams = getCsvParams(csvDelimiter, csvQuote, csvEscape); + ObjectMapper mapper = new ObjectMapper(); + List<ColumnDescriptionImpl> columnList = mapper.readValue(header, new TypeReference<List<ColumnDescriptionImpl>>(){}); + String path = uploadFileFromStream(uploadedInputStream, isFirstRowHeader, inputFileType, tableName, databaseName, columnList, containsEndlines, csvParams); JSONObject jo = new JSONObject(); jo.put("uploadedPath", path); @@ -370,14 +403,13 @@ public class UploadService extends BaseService { } private Job createJob(String query, String databaseName) throws InvocationTargetException, IllegalAccessException, ItemNotFound { - Map jobInfo = new HashMap<String, String>(); - jobInfo.put("title", "Internal Table Creation"); + Map jobInfo = new HashMap<>(); + jobInfo.put("title", "Internal Job"); jobInfo.put("forcedContent", query); jobInfo.put("dataBase", databaseName); - LOG.info("jobInfo : " + jobInfo); Job job = new JobImpl(jobInfo); - LOG.info("job : " + job); + LOG.info("creating job : {}", job); getResourceManager().create(job); JobController createdJobController = getResourceManager().readController(job.getId()); @@ -414,7 +446,7 @@ public class UploadService extends BaseService { else return e.getMessage(); } - private PreviewData generatePreview(Boolean isFirstRowHeader, String inputFileType, InputStream uploadedInputStream) throws Exception { + private PreviewData generatePreview(Boolean isFirstRowHeader, String inputFileType, CSVParams csvParams, InputStream uploadedInputStream) throws Exception { ParseOptions parseOptions = new ParseOptions(); parseOptions.setOption(ParseOptions.OPTIONS_FILE_TYPE, inputFileType); if (inputFileType.equals(ParseOptions.InputFileType.CSV.toString())){ @@ -422,6 +454,10 @@ public class UploadService extends BaseService { parseOptions.setOption(ParseOptions.OPTIONS_HEADER, ParseOptions.HEADER.FIRST_RECORD.toString()); else parseOptions.setOption(ParseOptions.OPTIONS_HEADER, ParseOptions.HEADER.NONE.toString()); + + parseOptions.setOption(ParseOptions.OPTIONS_CSV_DELIMITER, csvParams.getCsvDelimiter()); + parseOptions.setOption(ParseOptions.OPTIONS_CSV_ESCAPE_CHAR, csvParams.getCsvEscape()); + parseOptions.setOption(ParseOptions.OPTIONS_CSV_QUOTE, csvParams.getCsvQuote()); } else parseOptions.setOption(ParseOptions.OPTIONS_HEADER, ParseOptions.HEADER.EMBEDDED.toString()); @@ -434,7 +470,7 @@ public class UploadService extends BaseService { } private Response createPreviewResponse(PreviewData pd, Boolean isFirstRowHeader, String tableName) { - Map<String, Object> retData = new HashMap<String, Object>(); + Map<String, Object> retData = new HashMap<>(); retData.put("header", pd.getHeader()); retData.put("rows", pd.getPreviewRows()); retData.put("isFirstRowHeader", isFirstRowHeader); @@ -454,19 +490,29 @@ public class UploadService extends BaseService { Boolean isFirstRowHeader, String inputFileType, // the format of the file uploaded. CSV/JSON etc. String tableName, - String databaseName - + String databaseName, + List<ColumnDescriptionImpl> header, + boolean containsEndlines, + CSVParams csvParams ) throws Exception { LOG.info(" uploading file into databaseName {}, tableName {}", databaseName, tableName); ParseOptions parseOptions = new ParseOptions(); parseOptions.setOption(ParseOptions.OPTIONS_FILE_TYPE, inputFileType); + if(isFirstRowHeader){ + parseOptions.setOption(ParseOptions.OPTIONS_HEADER, ParseOptions.HEADER.FIRST_RECORD.toString()); + }else{ + parseOptions.setOption(ParseOptions.OPTIONS_HEADER, ParseOptions.HEADER.NONE.toString()); + } - DataParser dataParser = new DataParser(new InputStreamReader(uploadedInputStream), parseOptions); + if(null != csvParams){ + parseOptions.setOption(ParseOptions.OPTIONS_CSV_DELIMITER, csvParams.getCsvDelimiter()); + parseOptions.setOption(ParseOptions.OPTIONS_CSV_ESCAPE_CHAR, csvParams.getCsvEscape()); + parseOptions.setOption(ParseOptions.OPTIONS_CSV_QUOTE, csvParams.getCsvQuote()); + } - if (inputFileType.equals(ParseOptions.InputFileType.CSV.toString()) && isFirstRowHeader) - dataParser.extractHeader(); // removes the header line if any from the stream + DataParser dataParser = new DataParser(new InputStreamReader(uploadedInputStream), parseOptions); - Reader csvReader = dataParser.getTableDataReader(); + Reader csvReader = new TableDataReader(dataParser.iterator(), header, containsEndlines); // encode column values into HEX so that \n etc dont appear in the hive table data String path = uploadIntoTable(csvReader, databaseName, tableName); return path; } http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/DataParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/DataParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/DataParser.java index d03dd7e..fe2c740 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/DataParser.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/DataParser.java @@ -19,11 +19,10 @@ package org.apache.ambari.view.hive.resources.uploads.parsers; import org.apache.ambari.view.hive.client.Row; -import org.apache.ambari.view.hive.resources.uploads.parsers.csv.CSVParser; +import org.apache.ambari.view.hive.resources.uploads.parsers.csv.opencsv.OpenCSVParser; import org.apache.ambari.view.hive.resources.uploads.parsers.json.JSONParser; import org.apache.ambari.view.hive.resources.uploads.parsers.xml.XMLParser; -import java.io.IOException; import java.io.Reader; import java.util.Iterator; @@ -37,7 +36,7 @@ public class DataParser implements IParser { public DataParser(Reader reader, ParseOptions parseOptions) throws Exception { if (parseOptions.getOption(ParseOptions.OPTIONS_FILE_TYPE).equals(ParseOptions.InputFileType.CSV.toString())) { - parser = new CSVParser(reader, parseOptions); + parser = new OpenCSVParser(reader, parseOptions); } else if (parseOptions.getOption(ParseOptions.OPTIONS_FILE_TYPE).equals(ParseOptions.InputFileType.JSON.toString())) { parser = new JSONParser(reader, parseOptions); } else if (parseOptions.getOption(ParseOptions.OPTIONS_FILE_TYPE).equals(ParseOptions.InputFileType.XML.toString())) { @@ -46,11 +45,6 @@ public class DataParser implements IParser { } @Override - public Reader getTableDataReader() { - return parser.getTableDataReader(); - } - - @Override public PreviewData parsePreview() { return parser.parsePreview(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/IParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/IParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/IParser.java index 8b75c04..4f4dc37 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/IParser.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/IParser.java @@ -18,27 +18,14 @@ package org.apache.ambari.view.hive.resources.uploads.parsers; -import org.apache.ambari.view.hive.client.ColumnDescription; import org.apache.ambari.view.hive.client.Row; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.Reader; -import java.util.List; - /** * Interface defining methods for Parsers that can used for generating preview * and uploading table into hive. */ public interface IParser extends Iterable<Row>, AutoCloseable{ - /** - * @return returns the Reader that can be read to get the table data as CSV Text Data that can be uploaded directly - * to HDFS - */ - Reader getTableDataReader(); - PreviewData parsePreview(); Row extractHeader(); http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/ParseOptions.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/ParseOptions.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/ParseOptions.java index e592b5f..3db4813 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/ParseOptions.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/ParseOptions.java @@ -21,6 +21,11 @@ package org.apache.ambari.view.hive.resources.uploads.parsers; import java.util.HashMap; public class ParseOptions { + public static final String OPTIONS_CSV_DELIMITER = "OPTIONS_CSV_DELIMITER"; + public static final String OPTIONS_CSV_QUOTE = "OPTIONS_CSV_QUOTE"; + public static final String OPTIONS_HEADERS = "OPTIONS_HEADERS"; + public static final String OPTIONS_CSV_ESCAPE_CHAR = "OPTIONS_CSV_ESCAPE_CHAR"; + public enum InputFileType { CSV, JSON, @@ -46,4 +51,11 @@ public class ParseOptions { public Object getOption(String key) { return this.options.get(key); } + + @Override + public String toString() { + return new StringBuilder("ParseOptions{") + .append("options=").append(options) + .append('}').toString(); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/Parser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/Parser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/Parser.java index 49f47c7..782b088 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/Parser.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/Parser.java @@ -21,7 +21,6 @@ package org.apache.ambari.view.hive.resources.uploads.parsers; import org.apache.ambari.view.hive.client.ColumnDescription; import org.apache.ambari.view.hive.client.Row; import org.apache.ambari.view.hive.resources.uploads.ColumnDescriptionImpl; -import org.apache.ambari.view.hive.resources.uploads.TableDataReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +38,7 @@ public abstract class Parser implements IParser { protected final static Logger LOG = LoggerFactory.getLogger(Parser.class); + public static final String COLUMN_PREFIX = "column"; protected Reader reader; // same as CSV reader in this case protected ParseOptions parseOptions; @@ -70,11 +70,6 @@ public abstract class Parser implements IParser { } @Override - public Reader getTableDataReader() { - return new TableDataReader(this.iterator()); - } - - @Override public PreviewData parsePreview() { LOG.info("generating preview for : {}", this.parseOptions ); @@ -88,7 +83,7 @@ public abstract class Parser implements IParser { } int numberOfRows = numberOfPreviewRows; - previewRows = new ArrayList<>(numberOfPreviewRows + 1); // size including the header. + previewRows = new ArrayList<>(numberOfPreviewRows); Row headerRow = null; Integer numOfCols = null; @@ -152,7 +147,7 @@ public abstract class Parser implements IParser { ColumnDescription.DataTypes type = getLikelyDataType(previewRows,colNum); LOG.info("datatype detected for column {} : {}", colNum, type); - String colName = "Column" + (colNum + 1); + String colName = COLUMN_PREFIX + (colNum + 1); if (null != headerRow) colName = (String) headerRow.getRow()[colNum]; http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/RowIterator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/RowIterator.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/RowIterator.java index 69fe864..2dc8c22 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/RowIterator.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/RowIterator.java @@ -41,8 +41,10 @@ public class RowIterator implements Iterator<Row> { public RowIterator(RowMapIterator iterator) { this.iterator = iterator; LinkedHashMap<String, String> obj = iterator.peek(); - if (null != obj) - headers = new LinkedList<>(obj.keySet()); + headers = new LinkedList<>(); + if (null != obj) { + headers.addAll(obj.keySet()); + } } @Override http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/CSVIterator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/CSVIterator.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/CSVIterator.java deleted file mode 100644 index 3342f49..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/CSVIterator.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive.resources.uploads.parsers.csv; - -import org.apache.ambari.view.hive.client.Row; -import org.apache.commons.csv.CSVRecord; - -import java.util.Iterator; - -/** - * iterates over the input CSV records and generates Row objects - */ -class CSVIterator implements Iterator<Row> { - - private Iterator<CSVRecord> iterator; - - public CSVIterator(Iterator<CSVRecord> iterator) { - this.iterator = iterator; - } - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public Row next() { - CSVRecord row = iterator.next(); - Object[] values = new Object[row.size()]; - for (int i = 0; i < values.length; i++) { - values[i] = row.get(i); - } - Row r = new Row(values); - return r; - } - - @Override - public void remove() { - this.iterator.remove(); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/CSVParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/CSVParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/CSVParser.java deleted file mode 100644 index a48041c..0000000 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/CSVParser.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.view.hive.resources.uploads.parsers.csv; - -import org.apache.ambari.view.hive.client.Row; -import org.apache.ambari.view.hive.resources.uploads.parsers.ParseOptions; -import org.apache.ambari.view.hive.resources.uploads.parsers.Parser; -import org.apache.commons.csv.CSVFormat; - -import java.io.*; -import java.util.*; - -/** - * Parses the given Reader which contains CSV stream and extracts headers and rows, and detect datatypes of columns - */ -public class CSVParser extends Parser { - - private CSVIterator iterator; - private org.apache.commons.csv.CSVParser parser; - - public CSVParser(Reader reader, ParseOptions parseOptions) throws IOException { - super(reader, parseOptions); - parser = new org.apache.commons.csv.CSVParser(this.reader, CSVFormat.EXCEL); - iterator = new CSVIterator(parser.iterator()); - } - - @Override - public Row extractHeader() { - return this.iterator().next(); - } - - @Override - public void close() throws Exception { - this.parser.close(); - } - - public Iterator<Row> iterator() { - return iterator; // only one iterator per parser. - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/commonscsv/CSVIterator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/commonscsv/CSVIterator.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/commonscsv/CSVIterator.java new file mode 100644 index 0000000..e50a87c --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/commonscsv/CSVIterator.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.uploads.parsers.csv.commonscsv; + +import org.apache.ambari.view.hive.client.Row; +import org.apache.commons.csv.CSVRecord; + +import java.util.Iterator; + +/** + * iterates over the input CSV records and generates Row objects + */ +class CSVIterator implements Iterator<Row> { + + private Iterator<CSVRecord> iterator; + + public CSVIterator(Iterator<CSVRecord> iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public Row next() { + CSVRecord row = iterator.next(); + Object[] values = new Object[row.size()]; + for (int i = 0; i < values.length; i++) { + values[i] = row.get(i); + } + Row r = new Row(values); + return r; + } + + @Override + public void remove() { + this.iterator.remove(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/commonscsv/CSVParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/commonscsv/CSVParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/commonscsv/CSVParser.java new file mode 100644 index 0000000..ea9c9fb --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/commonscsv/CSVParser.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.view.hive.resources.uploads.parsers.csv.commonscsv; + +import org.apache.ambari.view.hive.client.Row; +import org.apache.ambari.view.hive.resources.uploads.parsers.ParseOptions; +import org.apache.ambari.view.hive.resources.uploads.parsers.Parser; +import org.apache.commons.csv.CSVFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Reader; +import java.util.Iterator; + +/** + * Parses the given Reader which contains CSV stream and extracts headers and rows, and detect datatypes of columns + */ +public class CSVParser extends Parser { + private CSVIterator iterator; + private org.apache.commons.csv.CSVParser parser; + private final static Logger LOG = + LoggerFactory.getLogger(CSVParser.class); + + public CSVParser(Reader reader, ParseOptions parseOptions) throws IOException { + super(reader, parseOptions); + CSVFormat format = CSVFormat.DEFAULT; + String optHeader = (String)parseOptions.getOption(ParseOptions.OPTIONS_HEADER); + if(optHeader != null){ + if(optHeader.equals(ParseOptions.HEADER.FIRST_RECORD.toString())) { + format = format.withHeader(); + }else if( optHeader.equals(ParseOptions.HEADER.PROVIDED_BY_USER.toString())){ + String [] headers = (String[]) parseOptions.getOption(ParseOptions.OPTIONS_HEADERS); + format = format.withHeader(headers); + } + } + + Character delimiter = (Character) parseOptions.getOption(ParseOptions.OPTIONS_CSV_DELIMITER); + if(delimiter != null){ + LOG.info("setting delimiter as {}", delimiter); + format = format.withDelimiter(delimiter); + } + + Character quote = (Character) parseOptions.getOption(ParseOptions.OPTIONS_CSV_QUOTE); + if( null != quote ){ + LOG.info("setting Quote char : {}", quote); + format = format.withQuote(quote); + } + + Character escape = (Character) parseOptions.getOption(ParseOptions.OPTIONS_CSV_ESCAPE_CHAR); + if(escape != null){ + LOG.info("setting escape as {}", escape); + format = format.withEscape(escape); + } + + parser = new org.apache.commons.csv.CSVParser(this.reader,format ); + iterator = new CSVIterator(parser.iterator()); + } + + @Override + public Row extractHeader() { + return new Row(parser.getHeaderMap().keySet().toArray()); + } + + @Override + public void close() throws Exception { + this.parser.close(); + } + + public Iterator<Row> iterator() { + return iterator; // only one iterator per parser. + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/opencsv/OpenCSVIterator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/opencsv/OpenCSVIterator.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/opencsv/OpenCSVIterator.java new file mode 100644 index 0000000..3f605cb --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/opencsv/OpenCSVIterator.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.uploads.parsers.csv.opencsv; + +import org.apache.ambari.view.hive.client.Row; + +import java.util.Iterator; + +/** + * iterates over the input CSV records and generates Row objects + */ +class OpenCSVIterator implements Iterator<Row> { + + private Iterator<String[]> iterator; + + public OpenCSVIterator(Iterator<String[]> iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public Row next() { + String[] row = iterator.next(); + Object[] values = new Object[row.length]; + for (int i = 0; i < values.length; i++) { + values[i] = row[i]; + } + Row r = new Row(values); + return r; + } + + @Override + public void remove() { + this.iterator.remove(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/opencsv/OpenCSVParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/opencsv/OpenCSVParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/opencsv/OpenCSVParser.java new file mode 100644 index 0000000..0109e91 --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/parsers/csv/opencsv/OpenCSVParser.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.view.hive.resources.uploads.parsers.csv.opencsv; + +import com.opencsv.CSVParserBuilder; +import com.opencsv.CSVReader; +import com.opencsv.CSVReaderBuilder; +import org.apache.ambari.view.hive.client.Row; +import org.apache.ambari.view.hive.resources.uploads.parsers.ParseOptions; +import org.apache.ambari.view.hive.resources.uploads.parsers.Parser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Reader; +import java.util.Iterator; + +/** + * Parses the given Reader which contains CSV stream and extracts headers and rows + */ +public class OpenCSVParser extends Parser { + private Row headerRow; + private OpenCSVIterator iterator; + private CSVReader csvReader = null; + private final static Logger LOG = + LoggerFactory.getLogger(OpenCSVParser.class); + + public OpenCSVParser(Reader reader, ParseOptions parseOptions) throws IOException { + super(reader, parseOptions); + CSVParserBuilder csvParserBuilder = new CSVParserBuilder(); + CSVReaderBuilder builder = new CSVReaderBuilder(reader); + + Character delimiter = (Character) parseOptions.getOption(ParseOptions.OPTIONS_CSV_DELIMITER); + if(delimiter != null){ + LOG.info("setting delimiter as {}", delimiter); + csvParserBuilder = csvParserBuilder.withSeparator(delimiter); + } + + Character quote = (Character) parseOptions.getOption(ParseOptions.OPTIONS_CSV_QUOTE); + if( null != quote ){ + LOG.info("setting Quote char : {}", quote); + csvParserBuilder = csvParserBuilder.withQuoteChar(quote); + } + + Character escapeChar = (Character) parseOptions.getOption(ParseOptions.OPTIONS_CSV_ESCAPE_CHAR); + if( null != escapeChar ){ + LOG.info("setting escapeChar : {}", escapeChar); + csvParserBuilder = csvParserBuilder.withEscapeChar(escapeChar); + } + + builder.withCSVParser(csvParserBuilder.build()); + this.csvReader = builder.build(); + iterator = new OpenCSVIterator(this.csvReader.iterator()); + + String optHeader = (String)parseOptions.getOption(ParseOptions.OPTIONS_HEADER); + if(optHeader != null){ + if(optHeader.equals(ParseOptions.HEADER.FIRST_RECORD.toString())) { + this.headerRow = iterator().hasNext() ? iterator.next() : new Row(new Object[]{}); + } + } + + } + + @Override + public Row extractHeader() { + return headerRow; + } + + @Override + public void close() throws Exception { + this.csvReader.close(); + } + + public Iterator<Row> iterator() { + return iterator; // only one iterator per parser. + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/query/InsertFromQueryInput.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/query/InsertFromQueryInput.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/query/InsertFromQueryInput.java index 5befc51..c568e0b 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/query/InsertFromQueryInput.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/query/InsertFromQueryInput.java @@ -18,20 +18,44 @@ package org.apache.ambari.view.hive.resources.uploads.query; +import org.apache.ambari.view.hive.resources.uploads.ColumnDescriptionImpl; + +import java.util.List; + public class InsertFromQueryInput { private String fromDatabase; private String fromTable; private String toDatabase; private String toTable; + private List<ColumnDescriptionImpl> header; + private Boolean unhexInsert = Boolean.FALSE; public InsertFromQueryInput() { } - public InsertFromQueryInput(String fromDatabase, String fromTable, String toDatabase, String toTable) { + public InsertFromQueryInput(String fromDatabase, String fromTable, String toDatabase, String toTable, List<ColumnDescriptionImpl> header, Boolean unhexInsert) { this.fromDatabase = fromDatabase; this.fromTable = fromTable; this.toDatabase = toDatabase; this.toTable = toTable; + this.header = header; + this.unhexInsert = unhexInsert; + } + + public List<ColumnDescriptionImpl> getHeader() { + return header; + } + + public void setHeader(List<ColumnDescriptionImpl> header) { + this.header = header; + } + + public Boolean getUnhexInsert() { + return unhexInsert; + } + + public void setUnhexInsert(Boolean unhexInsert) { + this.unhexInsert = unhexInsert; } public String getFromDatabase() { http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/query/QueryGenerator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/query/QueryGenerator.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/query/QueryGenerator.java index 6bab229..6db89e0 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/query/QueryGenerator.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/query/QueryGenerator.java @@ -19,7 +19,8 @@ package org.apache.ambari.view.hive.resources.uploads.query; import org.apache.ambari.view.hive.client.ColumnDescription; -import org.apache.ambari.view.hive.resources.uploads.*; +import org.apache.ambari.view.hive.resources.uploads.ColumnDescriptionImpl; +import org.apache.ambari.view.hive.resources.uploads.HiveFileType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,10 +37,10 @@ public class QueryGenerator { public String generateCreateQuery(TableInfo tableInfo) { String tableName = tableInfo.getTableName(); - List<ColumnDescriptionImpl> cdList = tableInfo.getColumns(); + List<ColumnDescriptionImpl> cdList = tableInfo.getHeader(); StringBuilder query = new StringBuilder(); - query.append("create table " + tableName + " ("); + query.append("CREATE TABLE ").append(tableName).append(" ("); Collections.sort(cdList, new Comparator<ColumnDescription>() { @Override public int compare(ColumnDescription o1, ColumnDescription o2) { @@ -55,7 +56,7 @@ public class QueryGenerator { query.append(", "); } - query.append(cd.getName() + " " + cd.getType()); + query.append(cd.getName()).append(" ").append(cd.getType()); if (cd.getPrecision() != null) { query.append("(").append(cd.getPrecision()); if (cd.getScale() != null) { @@ -68,31 +69,74 @@ public class QueryGenerator { query.append(")"); - if (tableInfo.getHiveFileType() == HiveFileType.TEXTFILE) - query.append(" ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE;"); - else - query.append(" STORED AS " + tableInfo.getHiveFileType() + ";"); - - String queryString = query.toString(); + if(tableInfo.getHiveFileType().equals(HiveFileType.TEXTFILE)) { + query.append(getRowFormatQuery(tableInfo.getRowFormat())); + } + query.append(" STORED AS ").append(tableInfo.getHiveFileType().toString()); + String queryString = query.append(";").toString(); LOG.info("Query : {}", queryString); return queryString; } + private String getRowFormatQuery(RowFormat rowFormat) { + StringBuilder sb = new StringBuilder(); + if(rowFormat != null) { + sb.append(" ROW FORMAT DELIMITED"); + if(rowFormat.getFieldsTerminatedBy() != null ){ + sb.append(" FIELDS TERMINATED BY '").append(rowFormat.getFieldsTerminatedBy()).append('\''); + } + if(rowFormat.getEscapedBy() != null){ + String escape = String.valueOf(rowFormat.getEscapedBy()); + if(rowFormat.getEscapedBy() == '\\'){ + escape = escape + '\\'; // special handling of slash as its escape char for strings in hive as well. + } + sb.append(" ESCAPED BY '").append(escape).append('\''); + } + } + + return sb.toString(); + } + public String generateInsertFromQuery(InsertFromQueryInput ifqi) { - String insertQuery = "insert into table " + ifqi.getToDatabase() + "." + ifqi.getToTable() + " select * from " + ifqi.getFromDatabase() + "." + ifqi.getFromTable(); - LOG.info("Insert Query : {}", insertQuery); - return insertQuery; + StringBuilder insertQuery = new StringBuilder("INSERT INTO TABLE ").append(ifqi.getToDatabase()).append(".") + .append(ifqi.getToTable()).append(" SELECT "); + + boolean first = true; + for(ColumnDescriptionImpl column : ifqi.getHeader()){ + String type = column.getType(); + boolean unhex = ifqi.getUnhexInsert() && ( + ColumnDescription.DataTypes.STRING.toString().equals(type) + || ColumnDescription.DataTypes.VARCHAR.toString().equals(type) + || ColumnDescription.DataTypes.CHAR.toString().equals(type) + ); + + if(!first){ + insertQuery.append(", "); + } + + if(unhex) { + insertQuery.append("UNHEX("); + } + + insertQuery.append(column.getName()); + + if(unhex) { + insertQuery.append(")"); + } + + first = false; + } + + insertQuery.append(" FROM ").append(ifqi.getFromDatabase()).append(".").append(ifqi.getFromTable()).append(";"); + String query = insertQuery.toString(); + LOG.info("Insert Query : {}", query); + return query; } public String generateDropTableQuery(DeleteQueryInput deleteQueryInput) { - String dropQuery = "drop table " + deleteQueryInput.getDatabase() + "." + deleteQueryInput.getTable(); + String dropQuery = new StringBuilder("DROP TABLE ").append(deleteQueryInput.getDatabase()) + .append(".").append(deleteQueryInput.getTable()).append(";").toString(); LOG.info("Drop Query : {}", dropQuery); return dropQuery; } - - public String generateLoadQuery(LoadQueryInput loadQueryInput) { - String loadFromQuery = "LOAD DATA INPATH '" + loadQueryInput.getHdfsFilePath() + "' INTO TABLE " + loadQueryInput.getDatabaseName() + "." + loadQueryInput.getTableName() + ";" ; - LOG.info("Load From Query : {}", loadFromQuery); - return loadFromQuery; - } } http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/query/RowFormat.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/query/RowFormat.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/query/RowFormat.java new file mode 100644 index 0000000..4c1cb2b --- /dev/null +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/query/RowFormat.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive.resources.uploads.query; + +public class RowFormat { + private Character fieldsTerminatedBy; + private Character escapedBy; + + private RowFormat() { + } + + public RowFormat(Character fieldsTerminatedBy, Character escapedBy) { + this.fieldsTerminatedBy = fieldsTerminatedBy; + this.escapedBy = escapedBy; + } + + public Character getFieldsTerminatedBy() { + return fieldsTerminatedBy; + } + + public void setFieldsTerminatedBy(Character fieldsTerminatedBy) { + this.fieldsTerminatedBy = fieldsTerminatedBy; + } + + public Character getEscapedBy() { + return escapedBy; + } + + public void setEscapedBy(Character escapedBy) { + this.escapedBy = escapedBy; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("RowFormat{ fieldsTerminatedBy='"); + sb.append(fieldsTerminatedBy).append( '\'').append(", escapedBy='") + .append(escapedBy).append("\'}"); + + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/d824786a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/query/TableInfo.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/query/TableInfo.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/query/TableInfo.java index 903e5b0..76f448c 100644 --- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/query/TableInfo.java +++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/uploads/query/TableInfo.java @@ -21,17 +21,20 @@ package org.apache.ambari.view.hive.resources.uploads.query; import org.apache.ambari.view.hive.resources.uploads.ColumnDescriptionImpl; import org.apache.ambari.view.hive.resources.uploads.HiveFileType; +import java.io.Serializable; import java.util.List; /** * used as input in Query generation */ -public class TableInfo { +public class TableInfo implements Serializable{ private String tableName; private String databaseName; - private List<ColumnDescriptionImpl> columns; + private List<ColumnDescriptionImpl> header; private HiveFileType hiveFileType; + private RowFormat rowFormat; + public String getTableName() { return tableName; } @@ -48,12 +51,12 @@ public class TableInfo { this.databaseName = databaseName; } - public List<ColumnDescriptionImpl> getColumns() { - return columns; + public List<ColumnDescriptionImpl> getHeader() { + return header; } - public void setColumns(List<ColumnDescriptionImpl> columns) { - this.columns = columns; + public void setHeader(List<ColumnDescriptionImpl> header) { + this.header = header; } public HiveFileType getHiveFileType() { @@ -64,18 +67,28 @@ public class TableInfo { this.hiveFileType = hiveFileType; } - public TableInfo(String databaseName, String tableName, List<ColumnDescriptionImpl> columns, HiveFileType hiveFileType) { - this.tableName = tableName; + public RowFormat getRowFormat() { + return rowFormat; + } + + public void setRowFormat(RowFormat rowFormat) { + this.rowFormat = rowFormat; + } + + public TableInfo(String databaseName, String tableName, List<ColumnDescriptionImpl> header, HiveFileType hiveFileType, RowFormat rowFormat) { this.databaseName = databaseName; - this.columns = columns; + this.tableName = tableName; + this.header = header; this.hiveFileType = hiveFileType; + this.rowFormat = rowFormat; } public TableInfo(TableInfo tableInfo) { this.tableName = tableInfo.tableName; this.databaseName = tableInfo.databaseName; - this.columns = tableInfo.columns; + this.header = tableInfo.header; this.hiveFileType = tableInfo.hiveFileType; + this.rowFormat = tableInfo.rowFormat; } public TableInfo() {
