http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/UploadService.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/UploadService.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/UploadService.java new file mode 100644 index 0000000..835626d --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/UploadService.java @@ -0,0 +1,565 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.uploads; + +import com.sun.jersey.core.header.FormDataContentDisposition; +import com.sun.jersey.multipart.FormDataParam; +import org.apache.ambari.view.ViewContext; +import org.apache.ambari.view.hive.resources.uploads.CSVParams; +import org.apache.ambari.view.hive20.BaseService; +import org.apache.ambari.view.hive20.ConnectionFactory; +import org.apache.ambari.view.hive20.ConnectionSystem; +import org.apache.ambari.view.hive20.client.DDLDelegator; +import org.apache.ambari.view.hive20.client.DDLDelegatorImpl; +import org.apache.ambari.view.hive20.client.Row; +import org.apache.ambari.view.hive20.resources.jobs.viewJobs.Job; +import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobController; +import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobImpl; +import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobResourceManager; +import org.apache.ambari.view.hive20.resources.uploads.parsers.DataParser; +import org.apache.ambari.view.hive20.resources.uploads.parsers.ParseOptions; +import org.apache.ambari.view.hive20.resources.uploads.parsers.PreviewData; +import org.apache.ambari.view.hive20.resources.uploads.query.DeleteQueryInput; +import org.apache.ambari.view.hive20.resources.uploads.query.InsertFromQueryInput; +import org.apache.ambari.view.hive20.resources.uploads.query.QueryGenerator; +import org.apache.ambari.view.hive20.resources.uploads.query.TableInfo; +import org.apache.ambari.view.hive20.utils.ServiceFormattedException; +import org.apache.ambari.view.hive20.utils.SharedObjectsFactory; +import org.apache.ambari.view.utils.ambari.AmbariApi; +import org.apache.commons.io.input.ReaderInputStream; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * UI driven end points for creation of new hive table and inserting data into it. + * It uploads a file, parses it partially based on its type, generates preview, + * creates temporary hive table for storage as CSV and actual hive table, + * uploads the file again, parses it, create CSV stream and upload to hdfs in temporary table, + * insert rows from temporary table to actual table, delete temporary table. + * <p/> + * API: + * POST /preview : takes stream, parses it and returns preview rows, headers and column type suggestions + * POST /createTable : runs hive query to create table in hive + * POST /upload : takes stream, parses it and converts it into CSV and uploads it to the temporary table + * POST /insertIntoTable : runs hive query to insert data from temporary table to actual hive table + * POST /deleteTable : deletes the temporary table + */ +public class UploadService extends BaseService { + + private final static Logger LOG = + LoggerFactory.getLogger(UploadService.class); + + @Inject + protected ViewContext context; + + private AmbariApi ambariApi; + protected JobResourceManager resourceManager; + + final private static String HIVE_METASTORE_LOCATION_KEY = "hive.metastore.warehouse.dir"; + final private static String HIVE_SITE = "hive-site"; + final private static String HIVE_METASTORE_LOCATION_KEY_VIEW_PROPERTY = HIVE_METASTORE_LOCATION_KEY; + private static final String HIVE_DEFAULT_METASTORE_LOCATION = "/apps/hive/warehouse"; + final private static String HIVE_DEFAULT_DB = "default"; + + public void validateForUploadFile(UploadFromHdfsInput input){ + if( null == input.getInputFileType()){ + throw new IllegalArgumentException("inputFileType parameter cannot be null."); + } + if( null == input.getHdfsPath()){ + throw new IllegalArgumentException("hdfsPath parameter cannot be null."); + } + if( null == input.getTableName()){ + throw new IllegalArgumentException("tableName parameter cannot be null."); + } + if( null == input.getDatabaseName()){ + throw new IllegalArgumentException("databaseName parameter cannot be null."); + } + + if( input.getIsFirstRowHeader() == null ){ + input.setIsFirstRowHeader(false); + } + } + + public void validateForPreview(UploadFromHdfsInput input){ + if( input.getIsFirstRowHeader() == null ){ + input.setIsFirstRowHeader(false); + } + + if( null == input.getInputFileType()){ + throw new IllegalArgumentException("inputFileType parameter cannot be null."); + } + if( null == input.getHdfsPath()){ + throw new IllegalArgumentException("hdfsPath parameter cannot be null."); + } + } + + @POST + @Path("/previewFromHdfs") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response uploadForPreviewFromHDFS(UploadFromHdfsInput input) { + InputStream uploadedInputStream = null; + try { + uploadedInputStream = getHDFSFileStream(input.getHdfsPath()); + this.validateForPreview(input); + CSVParams csvParams = getCsvParams(input.getCsvDelimiter(), input.getCsvQuote(), input.getCsvEscape()); + PreviewData pd = generatePreview(input.getIsFirstRowHeader(), input.getInputFileType(), csvParams, uploadedInputStream); + String tableName = getBasenameFromPath(input.getHdfsPath()); + return createPreviewResponse(pd, input.getIsFirstRowHeader(), tableName); + } catch (WebApplicationException e) { + LOG.error(getErrorMessage(e), e); + throw e; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new ServiceFormattedException(e); + } finally { + if (null != uploadedInputStream) { + try { + uploadedInputStream.close(); + } catch (IOException e) { + LOG.error("Exception occured while closing the HDFS file stream for path " + input.getHdfsPath(), e); + } + } + } + } + + @POST + @Path("/preview") + @Consumes(MediaType.MULTIPART_FORM_DATA) + public Response uploadForPreview( + @FormDataParam("file") InputStream uploadedInputStream, + @FormDataParam("file") FormDataContentDisposition fileDetail, + @FormDataParam("isFirstRowHeader") Boolean isFirstRowHeader, + @FormDataParam("inputFileType") String inputFileType, + @FormDataParam("csvDelimiter") String csvDelimiter, + @FormDataParam("csvEscape") String csvEscape, + @FormDataParam("csvQuote") String csvQuote + ) { + try { + if( null == inputFileType) + throw new IllegalArgumentException("inputFileType parameter cannot be null."); + + if( null == isFirstRowHeader ) + isFirstRowHeader = false; + + CSVParams csvParams = getCsvParams(csvDelimiter, csvQuote, csvEscape); + + PreviewData pd = generatePreview(isFirstRowHeader, inputFileType, csvParams, uploadedInputStream); + return createPreviewResponse(pd, isFirstRowHeader, getBasename(fileDetail.getFileName())); + } catch (WebApplicationException e) { + LOG.error(getErrorMessage(e), e); + throw e; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new ServiceFormattedException(e); + } + } + + private CSVParams getCsvParams(String csvDelimiter, String csvQuote, String csvEscape) { + char csvq = CSVParams.DEFAULT_QUOTE_CHAR; + char csvd = CSVParams.DEFAULT_DELIMITER_CHAR; + char csve = CSVParams.DEFAULT_ESCAPE_CHAR; + + if(null != csvDelimiter){ + char[] csvdArray = csvDelimiter.toCharArray(); + if(csvdArray.length > 0 ) { + csvd = csvdArray[0]; + } + } + + if(null != csvQuote){ + char[] csvqArray = csvQuote.toCharArray(); + if(csvqArray.length > 0 ) { + csvq = csvqArray[0]; + } + } + + if(null != csvEscape){ + char[] csveArray = csvEscape.toCharArray(); + if(csveArray.length > 0 ) { + csve = csveArray[0]; + } + } + + return new CSVParams(csvd, csvq, csve); + } + + + @Path("/createTable") + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response createTable(TableInput tableInput) { + try { + tableInput.validate(); + String databaseName = tableInput.getDatabaseName(); + String tableCreationQuery = generateCreateQuery(tableInput); + LOG.info("tableCreationQuery : {}", tableCreationQuery); + + Job job = createJob(tableCreationQuery, databaseName); + LOG.info("job created for table creation {}", job); + return Response.ok(job).build(); + } catch (WebApplicationException e) { + LOG.error(getErrorMessage(e), e); + throw e; + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + throw new ServiceFormattedException(e); + } + } + + @Path("/uploadFromHDFS") + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response uploadFileFromHdfs(UploadFromHdfsInput input) { + // create stream and upload + InputStream hdfsStream = null; + try { + hdfsStream = getHDFSFileStream(input.getHdfsPath()); + CSVParams csvParams = getCsvParams(input.getCsvDelimiter(), input.getCsvQuote(), input.getCsvEscape()); + String path = uploadFileFromStream(hdfsStream, input.getIsFirstRowHeader(), input.getInputFileType(), input.getTableName(), input.getDatabaseName(), input.getHeader(), input.isContainsEndlines(), csvParams); + + JSONObject jo = new JSONObject(); + jo.put("uploadedPath", path); + + return Response.ok(jo).build(); + } catch (WebApplicationException e) { + LOG.error(getErrorMessage(e), e); + throw e; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new ServiceFormattedException(e); + } finally { + if (null != hdfsStream) + try { + hdfsStream.close(); + } catch (IOException e) { + LOG.error("Exception occured while closing the HDFS stream for path : " + input.getHdfsPath(), e); + } + } + } + + @Path("/upload") + @POST + @Consumes(MediaType.MULTIPART_FORM_DATA) + @Produces(MediaType.APPLICATION_JSON) + public Response uploadFile( + @FormDataParam("file") InputStream uploadedInputStream, + @FormDataParam("file") FormDataContentDisposition fileDetail, + @FormDataParam("isFirstRowHeader") Boolean isFirstRowHeader, + @FormDataParam("inputFileType") String inputFileType, // the format of the file uploaded. CSV/JSON etc. + @FormDataParam("tableName") String tableName, + @FormDataParam("databaseName") String databaseName, + @FormDataParam("header") String header, + @FormDataParam("containsEndlines") boolean containsEndlines, + @FormDataParam("csvDelimiter") String csvDelimiter, + @FormDataParam("csvEscape") String csvEscape, + @FormDataParam("csvQuote") String csvQuote + + ) { + try { + CSVParams csvParams = getCsvParams(csvDelimiter, csvQuote, csvEscape); + ObjectMapper mapper = new ObjectMapper(); + List<ColumnDescriptionImpl> columnList = mapper.readValue(header, new TypeReference<List<ColumnDescriptionImpl>>(){}); + String path = uploadFileFromStream(uploadedInputStream, isFirstRowHeader, inputFileType, tableName, databaseName, columnList, containsEndlines, csvParams); + + JSONObject jo = new JSONObject(); + jo.put("uploadedPath", path); + return Response.ok(jo).build(); + } catch (WebApplicationException e) { + LOG.error(getErrorMessage(e), e); + throw e; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new ServiceFormattedException(e); + } + } + + @Path("/insertIntoTable") + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response insertFromTempTable(InsertFromQueryInput input) { + try { + String insertQuery = generateInsertFromQuery(input); + LOG.info("insertQuery : {}", insertQuery); + + Job job = createJob(insertQuery, "default"); + LOG.info("Job created for insert from temp table : {}", job); + return Response.ok(job).build(); + } catch (WebApplicationException e) { + LOG.error(getErrorMessage(e), e); + throw e; + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + throw new ServiceFormattedException(e); + } + } + + @Path("/deleteTable") + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response deleteTable(DeleteQueryInput input) { + try { + String deleteQuery = generateDeleteQuery(input); + LOG.info("deleteQuery : {}", deleteQuery); + + Job job = createJob(deleteQuery, "default"); + LOG.info("Job created for delete temp table : {} ", job); + return Response.ok(job).build(); + } catch (WebApplicationException e) { + LOG.error(getErrorMessage(e), e); + throw e; + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + throw new ServiceFormattedException(e); + } + } + + private String uploadIntoTable(Reader reader, String databaseName, String tempTableName) { + try { + String fullPath = getHiveMetaStoreLocation(databaseName, tempTableName); + LOG.info("Uploading file into : {}", fullPath); + uploadFile(fullPath, new ReaderInputStream(reader)); + return fullPath; + } catch (WebApplicationException e) { + LOG.error(getErrorMessage(e), e); + throw e; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new ServiceFormattedException(e); + } + } + + private synchronized JobResourceManager getResourceManager() { + if (resourceManager == null) { + SharedObjectsFactory connectionsFactory = getSharedObjectsFactory(); + resourceManager = new JobResourceManager(connectionsFactory, context); + } + return resourceManager; + } + + private synchronized AmbariApi getAmbariApi() { + if (null == ambariApi) { + ambariApi = new AmbariApi(this.context); + } + return ambariApi; + } + + private String generateCreateQuery(TableInfo ti) { + return new QueryGenerator().generateCreateQuery(ti); + } + + private String generateInsertFromQuery(InsertFromQueryInput input) { + return new QueryGenerator().generateInsertFromQuery(input); + } + + private String generateDeleteQuery(DeleteQueryInput deleteQueryInput) { + return new QueryGenerator().generateDropTableQuery(deleteQueryInput); + } + + private Job createJob(String query, String databaseName) throws Throwable{ + Map jobInfo = new HashMap<>(); + jobInfo.put("title", "Internal Job"); + jobInfo.put("forcedContent", query); + jobInfo.put("dataBase", databaseName); + + Job job = new JobImpl(jobInfo); + LOG.info("creating job : {}", job); + getResourceManager().create(job); + + JobController createdJobController = getResourceManager().readController(job.getId()); + createdJobController.submit(); + getResourceManager().saveIfModified(createdJobController); + + return job; + } + + private String getHiveMetaStoreLocation(String db, String table) { + String locationColValue = "Location:"; + String urlString = null; + DDLDelegator delegator = new DDLDelegatorImpl(context, ConnectionSystem.getInstance().getActorSystem(), ConnectionSystem.getInstance().getOperationController(context)); + List<Row> result = delegator.getTableDescriptionFormatted(ConnectionFactory.create(context), db, table); + for (Row row : result) { + if (row != null && row.getRow().length > 1 && row.getRow()[0] != null && row.getRow()[0].toString().trim().equals(locationColValue)) { + urlString = row.getRow()[1] == null ? null : row.getRow()[1].toString(); + break; + } + } + + String tablePath = null; + if (null != urlString) { + try { + URI uri = new URI(urlString); + tablePath = uri.getPath(); + } catch (URISyntaxException e) { + LOG.debug("Error occurred while parsing as url : ", urlString, e); + } + } else { + String basePath = getHiveMetaStoreLocation(); + if (!basePath.endsWith("/")) { + basePath = basePath + "/"; + } + if (db != null && !db.equals(HIVE_DEFAULT_DB)) { + basePath = basePath + db + ".db/"; + } + tablePath = basePath + table; + } + + return tablePath + "/" + table ; + } + + private String getHiveMetaStoreLocation() { + String dir = context.getProperties().get(HIVE_METASTORE_LOCATION_KEY_VIEW_PROPERTY); + if (dir != null && !dir.trim().isEmpty()) { + return dir; + } else { + LOG.debug("Neither found associated cluster nor found the view property {}. Returning default location : {}", HIVE_METASTORE_LOCATION_KEY_VIEW_PROPERTY, HIVE_DEFAULT_METASTORE_LOCATION); + return HIVE_DEFAULT_METASTORE_LOCATION; + } + } + + private void uploadFile(final String filePath, InputStream uploadedInputStream) + throws IOException, InterruptedException { + byte[] chunk = new byte[1024]; + FSDataOutputStream out = getSharedObjectsFactory().getHdfsApi().create(filePath, false); + int n = -1; + while ((n = uploadedInputStream.read(chunk)) != -1) { + out.write(chunk, 0, n); + } + out.close(); + } + + private static String getErrorMessage(WebApplicationException e) { + if (null != e.getResponse() && null != e.getResponse().getEntity()) + return e.getResponse().getEntity().toString(); + else return e.getMessage(); + } + + private PreviewData generatePreview(Boolean isFirstRowHeader, String inputFileType, CSVParams csvParams, InputStream uploadedInputStream) throws Exception { + ParseOptions parseOptions = new ParseOptions(); + parseOptions.setOption(ParseOptions.OPTIONS_FILE_TYPE, inputFileType); + if (inputFileType.equals(ParseOptions.InputFileType.CSV.toString())){ + if(isFirstRowHeader) + parseOptions.setOption(ParseOptions.OPTIONS_HEADER, ParseOptions.HEADER.FIRST_RECORD.toString()); + else + parseOptions.setOption(ParseOptions.OPTIONS_HEADER, ParseOptions.HEADER.NONE.toString()); + + parseOptions.setOption(ParseOptions.OPTIONS_CSV_DELIMITER, csvParams.getCsvDelimiter()); + parseOptions.setOption(ParseOptions.OPTIONS_CSV_ESCAPE_CHAR, csvParams.getCsvEscape()); + parseOptions.setOption(ParseOptions.OPTIONS_CSV_QUOTE, csvParams.getCsvQuote()); + } + else + parseOptions.setOption(ParseOptions.OPTIONS_HEADER, ParseOptions.HEADER.EMBEDDED.toString()); + + LOG.info("isFirstRowHeader : {}, inputFileType : {}", isFirstRowHeader, inputFileType); + + DataParser dataParser = new DataParser(new InputStreamReader(uploadedInputStream), parseOptions); + + return dataParser.parsePreview(); + } + + private Response createPreviewResponse(PreviewData pd, Boolean isFirstRowHeader, String tableName) { + Map<String, Object> retData = new HashMap<>(); + retData.put("header", pd.getHeader()); + retData.put("rows", pd.getPreviewRows()); + retData.put("isFirstRowHeader", isFirstRowHeader); + retData.put("tableName", tableName); + + JSONObject jsonObject = new JSONObject(retData); + return Response.ok(jsonObject).build(); + } + + private InputStream getHDFSFileStream(String path) throws IOException, InterruptedException { + FSDataInputStream fsStream = getSharedObjectsFactory().getHdfsApi().open(path); + return fsStream; + } + + private String uploadFileFromStream( + InputStream uploadedInputStream, + Boolean isFirstRowHeader, + String inputFileType, // the format of the file uploaded. CSV/JSON etc. + String tableName, + String databaseName, + List<ColumnDescriptionImpl> header, + boolean containsEndlines, + CSVParams csvParams + ) throws Exception { + LOG.info(" uploading file into databaseName {}, tableName {}", databaseName, tableName); + ParseOptions parseOptions = new ParseOptions(); + parseOptions.setOption(ParseOptions.OPTIONS_FILE_TYPE, inputFileType); + if(isFirstRowHeader){ + parseOptions.setOption(ParseOptions.OPTIONS_HEADER, ParseOptions.HEADER.FIRST_RECORD.toString()); + }else{ + parseOptions.setOption(ParseOptions.OPTIONS_HEADER, ParseOptions.HEADER.NONE.toString()); + } + + if(null != csvParams){ + parseOptions.setOption(ParseOptions.OPTIONS_CSV_DELIMITER, csvParams.getCsvDelimiter()); + parseOptions.setOption(ParseOptions.OPTIONS_CSV_ESCAPE_CHAR, csvParams.getCsvEscape()); + parseOptions.setOption(ParseOptions.OPTIONS_CSV_QUOTE, csvParams.getCsvQuote()); + } + + DataParser dataParser = new DataParser(new InputStreamReader(uploadedInputStream), parseOptions); + + Reader csvReader = new TableDataReader(dataParser.iterator(), header, containsEndlines); // encode column values into HEX so that \n etc dont appear in the hive table data + String path = uploadIntoTable(csvReader, databaseName, tableName); + return path; + } + + private String getBasenameFromPath(String path) { + String fileName = new File(path).getName(); + return getBasename(fileName); + } + + private String getBasename(String fileName) { + int index = fileName.indexOf("."); + if (index != -1) { + return fileName.substring(0, index); + } + + return fileName; + } +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/DataParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/DataParser.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/DataParser.java new file mode 100644 index 0000000..5ba7a8b --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/DataParser.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.uploads.parsers; + +import org.apache.ambari.view.hive20.client.Row; +import org.apache.ambari.view.hive20.resources.uploads.parsers.csv.opencsv.OpenCSVParser; +import org.apache.ambari.view.hive20.resources.uploads.parsers.json.JSONParser; +import org.apache.ambari.view.hive20.resources.uploads.parsers.xml.XMLParser; + +import java.io.Reader; +import java.util.Iterator; + +/** + * Wrapper/Decorator over the Stream parsers. + * Supports XML/JSON/CSV parsing. + */ +public class DataParser implements IParser { + + private IParser parser; + + public DataParser(Reader reader, ParseOptions parseOptions) throws Exception { + if (parseOptions.getOption(ParseOptions.OPTIONS_FILE_TYPE).equals(ParseOptions.InputFileType.CSV.toString())) { + parser = new OpenCSVParser(reader, parseOptions); + } else if (parseOptions.getOption(ParseOptions.OPTIONS_FILE_TYPE).equals(ParseOptions.InputFileType.JSON.toString())) { + parser = new JSONParser(reader, parseOptions); + } else if (parseOptions.getOption(ParseOptions.OPTIONS_FILE_TYPE).equals(ParseOptions.InputFileType.XML.toString())) { + parser = new XMLParser(reader, parseOptions); + } + } + + @Override + public PreviewData parsePreview() { + return parser.parsePreview(); + } + + @Override + public Row extractHeader() { + return parser.extractHeader(); + } + + @Override + public void close() throws Exception { + parser.close(); + } + + @Override + public Iterator<Row> iterator() { + return parser.iterator(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/EndOfDocumentException.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/EndOfDocumentException.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/EndOfDocumentException.java new file mode 100644 index 0000000..2128fab --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/EndOfDocumentException.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.uploads.parsers; + + +public class EndOfDocumentException extends Exception { + public EndOfDocumentException() { + } + + public EndOfDocumentException(String message) { + super(message); + } + + public EndOfDocumentException(String message, Throwable cause) { + super(message, cause); + } + + public EndOfDocumentException(Throwable cause) { + super(cause); + } + + public EndOfDocumentException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/IParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/IParser.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/IParser.java new file mode 100644 index 0000000..7294d2e --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/IParser.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.uploads.parsers; + +import org.apache.ambari.view.hive20.client.Row; + +/** + * Interface defining methods for Parsers that can used for generating preview + * and uploading table into hive. + */ +public interface IParser extends Iterable<Row>, AutoCloseable{ + + PreviewData parsePreview(); + + Row extractHeader(); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/ParseOptions.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/ParseOptions.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/ParseOptions.java new file mode 100644 index 0000000..cb513a3 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/ParseOptions.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.uploads.parsers; + +import java.util.HashMap; + +public class ParseOptions { + public static final String OPTIONS_CSV_DELIMITER = "OPTIONS_CSV_DELIMITER"; + public static final String OPTIONS_CSV_QUOTE = "OPTIONS_CSV_QUOTE"; + public static final String OPTIONS_HEADERS = "OPTIONS_HEADERS"; + public static final String OPTIONS_CSV_ESCAPE_CHAR = "OPTIONS_CSV_ESCAPE_CHAR"; + + public enum InputFileType { + CSV, + JSON, + XML + } + + public enum HEADER { + FIRST_RECORD, + PROVIDED_BY_USER, // not used right now but can be used when some metadata of file provide this information + EMBEDDED, // this one is for JSON/ XML and may be other file formats where its embedded with the data + NONE // if the file does not contain header information at all + } + final public static String OPTIONS_FILE_TYPE = "FILE_TYPE"; + final public static String OPTIONS_HEADER = "HEADER"; + final public static String OPTIONS_NUMBER_OF_PREVIEW_ROWS = "NUMBER_OF_PREVIEW_ROWS"; + + private HashMap<String, Object> options = new HashMap<>(); + + public void setOption(String key, Object value) { + this.options.put(key, value); + } + + public Object getOption(String key) { + return this.options.get(key); + } + + @Override + public String toString() { + return new StringBuilder("ParseOptions{") + .append("options=").append(options) + .append('}').toString(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/ParseUtils.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/ParseUtils.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/ParseUtils.java new file mode 100644 index 0000000..27d03f2 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/ParseUtils.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.uploads.parsers; + +import org.apache.directory.api.util.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Timestamp; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; + +import static org.apache.ambari.view.hive20.client.ColumnDescription.DataTypes; + +public class ParseUtils { + + protected final static Logger LOG = + LoggerFactory.getLogger(ParseUtils.class); + + final public static DataTypes[] dataTypeList = {DataTypes.BOOLEAN, DataTypes.INT, DataTypes.BIGINT, DataTypes.DOUBLE, DataTypes.CHAR, DataTypes.TIMESTAMP, DataTypes.DATE, DataTypes.STRING}; + private static final String HIVE_DATE_FORMAT = "yyyy-MM-dd"; + + // no strict checking required as it is done by Date parsing + private static final String HIVE_DATE_FORMAT_REGEX = "^[0-9]{4}-[0-9]?[0-9]-[0-9]?[0-9]$"; + + + public static boolean isInteger(Object object) { + if (object == null) + return false; + + if (object instanceof Integer) + return true; + + try { + Integer i = Integer.parseInt(object.toString()); + return true; + } catch (NumberFormatException nfe) { + return false; + } + } + + public static boolean isBoolean(Object object) { + if (object == null) + return false; + + if (object instanceof Boolean) + return true; + + String strValue = object.toString(); + return strValue.equalsIgnoreCase("true") || strValue.equalsIgnoreCase("false"); + } + + public static boolean isString(Object object) { + return object != null; + } + + public static boolean isLong(Object object) { + if (object == null) + return false; + + if (object instanceof Long) + return true; + + try { + Long i = Long.parseLong(object.toString()); + return true; + } catch (Exception nfe) { + return false; + } + } + + public static boolean isDouble(Object object) { + if (object == null) + return false; + + if (object instanceof Double) + return true; + + try { + Double i = Double.parseDouble(object.toString()); + return true; + } catch (Exception nfe) { + return false; + } + } + + public static boolean isChar(Object object) { + if (object == null) + return false; + + if (object instanceof Character) + return true; + + String str = object.toString().trim(); + return str.length() == 1; + + } + + public static boolean isDate(Object object) { + if (object == null) + return false; + + if (object instanceof Date) + return true; + + String str = object.toString(); + if (Strings.isNotEmpty(str)) { + str = str.trim(); + if (str.matches(HIVE_DATE_FORMAT_REGEX)) { + try { + SimpleDateFormat sdf = new SimpleDateFormat(HIVE_DATE_FORMAT); + sdf.setLenient(false); + Date date = sdf.parse(str); + return true; + } catch (Exception e) { + LOG.debug("error while parsing as date string {}, format {}", str, HIVE_DATE_FORMAT, e); + } + } + } + return false; + } + + public static boolean isTimeStamp(Object object) { + if (object == null) + return false; + + if (object instanceof Date) + return true; + + String str = object.toString(); + try { + Timestamp ts = Timestamp.valueOf(str); + return true; + } catch (Exception e) { + LOG.debug("error while parsing as timestamp string {}", str, e); + } + + return false; + } + + public static DataTypes detectHiveDataType(Object object) { + // detect Integer + if (isBoolean(object)) return DataTypes.BOOLEAN; + if (isInteger(object)) return DataTypes.INT; + if (isLong(object)) return DataTypes.BIGINT; + if (isDouble(object)) return DataTypes.DOUBLE; + if (isChar(object)) return DataTypes.CHAR; + if (isTimeStamp(object)) return DataTypes.TIMESTAMP; + if (isDate(object)) return DataTypes.DATE; + + return DataTypes.STRING; + } + + public static boolean checkDatatype( Object object, DataTypes datatype){ + switch(datatype){ + + case BOOLEAN : + return isBoolean(object); + case INT : + return isInteger(object); + case BIGINT : + return isLong(object); + case DOUBLE: + return isDouble(object); + case CHAR: + return isChar(object); + case DATE: + return isDate(object); + case TIMESTAMP: + return isTimeStamp(object); + case STRING: + return isString(object); + + default: + LOG.error("this datatype detection is not supported : {}", datatype); + return false; + } + } + + public static DataTypes detectHiveColumnDataType(List<Object> colValues) { + boolean found; + for(DataTypes datatype : dataTypeList){ + found = true; + for(Object object : colValues){ + if(!checkDatatype(object,datatype)){ + found = false; + break; + } + } + + if(found) return datatype; + } + + return DataTypes.STRING; //default + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/Parser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/Parser.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/Parser.java new file mode 100644 index 0000000..a012463 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/Parser.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.uploads.parsers; + +import org.apache.ambari.view.hive20.client.ColumnDescription; +import org.apache.ambari.view.hive20.client.Row; +import org.apache.ambari.view.hive20.resources.uploads.ColumnDescriptionImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Reader; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * provides general implementation for parsing JSON,CSV,XML file + * to generate preview rows, headers and column types + * also provides TableDataReader for converting any type to CSV. + */ +public abstract class Parser implements IParser { + + protected final static Logger LOG = + LoggerFactory.getLogger(Parser.class); + public static final String COLUMN_PREFIX = "column"; + + protected Reader reader; // same as CSV reader in this case + protected ParseOptions parseOptions; + private int numberOfPreviewRows = 10; + + public Parser(Reader originalReader, ParseOptions parseOptions) { + this.reader = originalReader; + this.parseOptions = parseOptions; + } + + /** + * returns which datatype is valid for all the values + */ + + /** + * + * @param rows : non empty list of rows + * @param colNum : to detect datatype for this column number. + * @return data type for that column + */ + private ColumnDescription.DataTypes getLikelyDataType(List<Row> rows, int colNum) { + // order of detection BOOLEAN,INT,BIGINT,DOUBLE,DATE,CHAR,STRING + List<Object> colValues = new ArrayList<>(rows.size()); + for( Row row : rows ){ + colValues.add(row.getRow()[colNum]); + } + + return ParseUtils.detectHiveColumnDataType(colValues); + } + + @Override + public PreviewData parsePreview() { + LOG.info("generating preview for : {}", this.parseOptions ); + + ArrayList<Row> previewRows; + List<ColumnDescription> header; + + try { + numberOfPreviewRows = (Integer) parseOptions.getOption(ParseOptions.OPTIONS_NUMBER_OF_PREVIEW_ROWS); + } catch (Exception e) { + LOG.debug("Illegal number of preview columns supplied {}",parseOptions.getOption(ParseOptions.OPTIONS_NUMBER_OF_PREVIEW_ROWS) ); + } + + int numberOfRows = numberOfPreviewRows; + previewRows = new ArrayList<>(numberOfPreviewRows); + + Row headerRow = null; + Integer numOfCols = null; + + if (parseOptions.getOption(ParseOptions.OPTIONS_HEADER) != null && + ( parseOptions.getOption(ParseOptions.OPTIONS_HEADER).equals(ParseOptions.HEADER.FIRST_RECORD.toString()) || + parseOptions.getOption(ParseOptions.OPTIONS_HEADER).equals(ParseOptions.HEADER.EMBEDDED.toString()) + )) { + headerRow = extractHeader(); + numOfCols = headerRow.getRow().length; + } + + Row r; + if (iterator().hasNext()) { + r = iterator().next(); + if( null == numOfCols ) { + numOfCols = r.getRow().length; + } + } else { + LOG.error("No rows found in the file. returning error."); + throw new NoSuchElementException("No rows in the file."); + } + + while (true) { + // create Header definition from row + Object[] values = r.getRow(); + Object[] newValues= new Object[numOfCols]; // adds null if less columns detected and removes extra columns if any + + for (int colNum = 0; colNum < numOfCols; colNum++) { + if(colNum < values.length) { + newValues[colNum] = values[colNum]; + }else{ + newValues[colNum] = null; + } + } + + previewRows.add(new Row(newValues)); + + numberOfRows--; + if (numberOfRows <= 0 || !iterator().hasNext()) + break; + + r = iterator().next(); + } + + if (previewRows.size() <= 0) { + LOG.error("No rows found in the file. returning error."); + throw new NoSuchElementException("Does not contain any rows."); + } + + // find data types. + header = generateHeader(headerRow,previewRows,numOfCols); + + return new PreviewData(header,previewRows); + } + + private List<ColumnDescription> generateHeader(Row headerRow,List<Row> previewRows, int numOfCols) { + List<ColumnDescription> header = new ArrayList<>(); + + for (int colNum = 0; colNum < numOfCols; colNum++) { + ColumnDescription.DataTypes type = getLikelyDataType(previewRows,colNum); + LOG.info("datatype detected for column {} : {}", colNum, type); + + String colName = COLUMN_PREFIX + (colNum + 1); + if (null != headerRow) + colName = (String) headerRow.getRow()[colNum]; + + ColumnDescription cd = new ColumnDescriptionImpl(colName, type.toString(), colNum); + header.add(cd); + } + + LOG.debug("return headers : {} ", header); + return header; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/PreviewData.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/PreviewData.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/PreviewData.java new file mode 100644 index 0000000..50af529 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/PreviewData.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.uploads.parsers; + +import org.apache.ambari.view.hive20.client.ColumnDescription; +import org.apache.ambari.view.hive20.client.Row; + +import java.util.List; + +/** + * Encapsulating preview data from parser. + */ +public class PreviewData { + private List<ColumnDescription> header; + private List<Row> previewRows; + + public PreviewData() { + } + + public PreviewData(List<ColumnDescription> header, List<Row> previewRows) { + this.header = header; + this.previewRows = previewRows; + } + + public List<ColumnDescription> getHeader() { + return header; + } + + public void setHeader(List<ColumnDescription> header) { + this.header = header; + } + + public List<Row> getPreviewRows() { + return previewRows; + } + + public void setPreviewRows(List<Row> previewRows) { + this.previewRows = previewRows; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/RowIterator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/RowIterator.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/RowIterator.java new file mode 100644 index 0000000..80e460f --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/RowIterator.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.uploads.parsers; + +import org.apache.ambari.view.hive20.client.Row; + +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; + +/** + * Converts the Map of values created by JSON/XML Parser into ordered values in Row + * Takes RowMapIterator as input + */ +public class RowIterator implements Iterator<Row> { + + private LinkedList<String> headers = null; + private RowMapIterator iterator; + + /** + * creates a row iterator for the map values in RowMapIterator + * keeps the keys in map as header. + * @param iterator + */ + public RowIterator(RowMapIterator iterator) { + this.iterator = iterator; + LinkedHashMap<String, String> obj = iterator.peek(); + headers = new LinkedList<>(); + if (null != obj) { + headers.addAll(obj.keySet()); + } + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + + @Override + public Row next() { + LinkedHashMap<String, String> r = this.iterator.next(); + if (null == r) { + return null; + } + + return convertToRow(r); + } + + @Override + public void remove() { + iterator.remove(); + } + + /** + * @return : ordered collection of string of headers + */ + public LinkedList<String> extractHeaders() { + return headers; + } + + /** + * converts the map into a Row + * @param lr + * @return + */ + private Row convertToRow(LinkedHashMap<String, String> lr) { + Object[] data = new Object[headers.size()]; + int i = 0; + for (String cd : headers) { + String d = lr.get(cd); + + if (d != null) + d = d.trim(); // trim to remove any \n etc which is used as a separator for rows in TableDataReader + + data[i++] = d; + } + + return new Row(data); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/RowMapIterator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/RowMapIterator.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/RowMapIterator.java new file mode 100644 index 0000000..f429157 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/RowMapIterator.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.uploads.parsers; + +import java.util.Iterator; +import java.util.LinkedHashMap; + +/** + * iterator which generates Ordered Map of column name and values for each row from streams like JSON and XML + */ +public interface RowMapIterator extends Iterator<LinkedHashMap<String, String>> { + LinkedHashMap<String, String> peek() ; +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/csv/commonscsv/CSVIterator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/csv/commonscsv/CSVIterator.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/csv/commonscsv/CSVIterator.java new file mode 100644 index 0000000..039bf21 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/csv/commonscsv/CSVIterator.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.uploads.parsers.csv.commonscsv; + +import org.apache.ambari.view.hive20.client.Row; +import org.apache.commons.csv.CSVRecord; + +import java.util.Iterator; + +/** + * iterates over the input CSV records and generates Row objects + */ +class CSVIterator implements Iterator<Row> { + + private Iterator<CSVRecord> iterator; + + public CSVIterator(Iterator<CSVRecord> iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public Row next() { + CSVRecord row = iterator.next(); + Object[] values = new Object[row.size()]; + for (int i = 0; i < values.length; i++) { + values[i] = row.get(i); + } + Row r = new Row(values); + return r; + } + + @Override + public void remove() { + this.iterator.remove(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/csv/commonscsv/CSVParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/csv/commonscsv/CSVParser.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/csv/commonscsv/CSVParser.java new file mode 100644 index 0000000..78ba77d --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/csv/commonscsv/CSVParser.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.view.hive20.resources.uploads.parsers.csv.commonscsv; + +import org.apache.ambari.view.hive20.client.Row; +import org.apache.ambari.view.hive20.resources.uploads.parsers.ParseOptions; +import org.apache.ambari.view.hive20.resources.uploads.parsers.Parser; +import org.apache.commons.csv.CSVFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Reader; +import java.util.Iterator; + +/** + * Parses the given Reader which contains CSV stream and extracts headers and rows, and detect datatypes of columns + */ +public class CSVParser extends Parser { + private CSVIterator iterator; + private org.apache.commons.csv.CSVParser parser; + private final static Logger LOG = + LoggerFactory.getLogger(CSVParser.class); + + public CSVParser(Reader reader, ParseOptions parseOptions) throws IOException { + super(reader, parseOptions); + CSVFormat format = CSVFormat.DEFAULT; + String optHeader = (String)parseOptions.getOption(ParseOptions.OPTIONS_HEADER); + if(optHeader != null){ + if(optHeader.equals(ParseOptions.HEADER.FIRST_RECORD.toString())) { + format = format.withHeader(); + }else if( optHeader.equals(ParseOptions.HEADER.PROVIDED_BY_USER.toString())){ + String [] headers = (String[]) parseOptions.getOption(ParseOptions.OPTIONS_HEADERS); + format = format.withHeader(headers); + } + } + + Character delimiter = (Character) parseOptions.getOption(ParseOptions.OPTIONS_CSV_DELIMITER); + if(delimiter != null){ + LOG.info("setting delimiter as {}", delimiter); + format = format.withDelimiter(delimiter); + } + + Character quote = (Character) parseOptions.getOption(ParseOptions.OPTIONS_CSV_QUOTE); + if( null != quote ){ + LOG.info("setting Quote char : {}", quote); + format = format.withQuote(quote); + } + + Character escape = (Character) parseOptions.getOption(ParseOptions.OPTIONS_CSV_ESCAPE_CHAR); + if(escape != null){ + LOG.info("setting escape as {}", escape); + format = format.withEscape(escape); + } + + parser = new org.apache.commons.csv.CSVParser(this.reader,format ); + iterator = new CSVIterator(parser.iterator()); + } + + @Override + public Row extractHeader() { + return new Row(parser.getHeaderMap().keySet().toArray()); + } + + @Override + public void close() throws Exception { + this.parser.close(); + } + + public Iterator<Row> iterator() { + return iterator; // only one iterator per parser. + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/csv/opencsv/OpenCSVIterator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/csv/opencsv/OpenCSVIterator.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/csv/opencsv/OpenCSVIterator.java new file mode 100644 index 0000000..16550c2 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/csv/opencsv/OpenCSVIterator.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.uploads.parsers.csv.opencsv; + +import org.apache.ambari.view.hive20.client.Row; + +import java.util.Iterator; + +/** + * iterates over the input CSV records and generates Row objects + */ +class OpenCSVIterator implements Iterator<Row> { + + private Iterator<String[]> iterator; + + public OpenCSVIterator(Iterator<String[]> iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public Row next() { + String[] row = iterator.next(); + Object[] values = new Object[row.length]; + for (int i = 0; i < values.length; i++) { + values[i] = row[i]; + } + Row r = new Row(values); + return r; + } + + @Override + public void remove() { + this.iterator.remove(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/csv/opencsv/OpenCSVParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/csv/opencsv/OpenCSVParser.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/csv/opencsv/OpenCSVParser.java new file mode 100644 index 0000000..ef89615 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/csv/opencsv/OpenCSVParser.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.view.hive20.resources.uploads.parsers.csv.opencsv; + +import com.opencsv.CSVParserBuilder; +import com.opencsv.CSVReader; +import com.opencsv.CSVReaderBuilder; +import org.apache.ambari.view.hive20.client.Row; +import org.apache.ambari.view.hive20.resources.uploads.parsers.ParseOptions; +import org.apache.ambari.view.hive20.resources.uploads.parsers.Parser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Reader; +import java.util.Iterator; + +/** + * Parses the given Reader which contains CSV stream and extracts headers and rows + */ +public class OpenCSVParser extends Parser { + private Row headerRow; + private OpenCSVIterator iterator; + private CSVReader csvReader = null; + private final static Logger LOG = + LoggerFactory.getLogger(OpenCSVParser.class); + + public OpenCSVParser(Reader reader, ParseOptions parseOptions) throws IOException { + super(reader, parseOptions); + CSVParserBuilder csvParserBuilder = new CSVParserBuilder(); + CSVReaderBuilder builder = new CSVReaderBuilder(reader); + + Character delimiter = (Character) parseOptions.getOption(ParseOptions.OPTIONS_CSV_DELIMITER); + if(delimiter != null){ + LOG.info("setting delimiter as {}", delimiter); + csvParserBuilder = csvParserBuilder.withSeparator(delimiter); + } + + Character quote = (Character) parseOptions.getOption(ParseOptions.OPTIONS_CSV_QUOTE); + if( null != quote ){ + LOG.info("setting Quote char : {}", quote); + csvParserBuilder = csvParserBuilder.withQuoteChar(quote); + } + + Character escapeChar = (Character) parseOptions.getOption(ParseOptions.OPTIONS_CSV_ESCAPE_CHAR); + if( null != escapeChar ){ + LOG.info("setting escapeChar : {}", escapeChar); + csvParserBuilder = csvParserBuilder.withEscapeChar(escapeChar); + } + + builder.withCSVParser(csvParserBuilder.build()); + this.csvReader = builder.build(); + iterator = new OpenCSVIterator(this.csvReader.iterator()); + + String optHeader = (String)parseOptions.getOption(ParseOptions.OPTIONS_HEADER); + if(optHeader != null){ + if(optHeader.equals(ParseOptions.HEADER.FIRST_RECORD.toString())) { + this.headerRow = iterator().hasNext() ? iterator.next() : new Row(new Object[]{}); + } + } + + } + + @Override + public Row extractHeader() { + return headerRow; + } + + @Override + public void close() throws Exception { + this.csvReader.close(); + } + + public Iterator<Row> iterator() { + return iterator; // only one iterator per parser. + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/json/JSONIterator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/json/JSONIterator.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/json/JSONIterator.java new file mode 100644 index 0000000..d46602a --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/json/JSONIterator.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.uploads.parsers.json; + +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonToken; +import org.apache.ambari.view.hive20.resources.uploads.parsers.EndOfDocumentException; +import org.apache.ambari.view.hive20.resources.uploads.parsers.RowMapIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.LinkedHashMap; + +/** + * iterates over the JsonReader and reads creates row data + * assumes the array of json objects. + * eg : [ { "col1Name" : "value-1-1", "col2Name" : "value-1-2"}, { "col1Name" : "value-2-1", "col2Name" : "value-2-2"}] + */ +class JSONIterator implements RowMapIterator { + + protected final static Logger LOG = + LoggerFactory.getLogger(JSONIterator.class); + + private LinkedHashMap<String, String> nextObject = null; + + private LinkedHashMap<String, String> readNextObject(JsonReader reader) throws IOException, EndOfDocumentException { + LinkedHashMap<String, String> row = new LinkedHashMap<>(); + boolean objectStarted = false; + boolean shouldBeName = false; + String currentName = null; + + while (true) { + JsonToken token = reader.peek(); + switch (token) { + case BEGIN_ARRAY: + throw new IllegalArgumentException("Row data cannot have an array."); + case END_ARRAY: + throw new EndOfDocumentException("End of Json Array document."); + case BEGIN_OBJECT: + if (objectStarted == true) { + throw new IllegalArgumentException("Nested objects not supported."); + } + if (shouldBeName == true) { + throw new IllegalArgumentException("name expected, got begin_object"); + } + objectStarted = true; + shouldBeName = true; + reader.beginObject(); + break; + case END_OBJECT: + if (shouldBeName == false) { + throw new IllegalArgumentException("value expected, got end_object"); + } + reader.endObject(); + return row; + case NAME: + if (shouldBeName == false) { + throw new IllegalArgumentException("name not expected at this point."); + } + shouldBeName = false; + currentName = reader.nextName(); + break; + case NUMBER: + case STRING: + if (shouldBeName == true) { + throw new IllegalArgumentException("value not expected at this point."); + } + String n = reader.nextString(); + row.put(currentName, n); + shouldBeName = true; + break; + case BOOLEAN: + if (shouldBeName == true) { + throw new IllegalArgumentException("value not expected at this point."); + } + String b = String.valueOf(reader.nextBoolean()); + row.put(currentName, b); + shouldBeName = true; + break; + case NULL: + if (shouldBeName == true) { + throw new IllegalArgumentException("value not expected at this point."); + } + reader.nextNull(); + row.put(currentName, ""); + shouldBeName = true; + break; + case END_DOCUMENT: + return row; + + default: + throw new IllegalArgumentException("Illegal token detected inside json: token : " + token.toString()); + } + } + } + + private JsonReader reader; + + public JSONIterator(JsonReader reader) throws IOException { + this.reader = reader; + // test the start of array + JsonToken jt = reader.peek(); + if (jt != JsonToken.BEGIN_ARRAY) { + throw new IllegalArgumentException("Expected the whole document to contain a single JsonArray."); + } + + reader.beginArray(); // read the start of array + try { + nextObject = readNextObject(this.reader); + } catch (EndOfDocumentException e) { + } + } + + @Override + public boolean hasNext() { + return null != nextObject; + } + + public LinkedHashMap<String, String> peek() { + return nextObject; + } + + @Override + public LinkedHashMap<String, String> next() { + LinkedHashMap<String, String> currObject = nextObject; + try { + nextObject = readNextObject(this.reader); + } catch (EndOfDocumentException e) { + LOG.debug("End of Json document reached with next character ending the JSON Array."); + nextObject = null; + } catch (Exception e){ + // for any other exception throw error right away + throw new IllegalArgumentException(e); + } + return currObject; + } + + @Override + public void remove() { + // no operation. + LOG.info("No operation when remove called on JSONIterator."); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/json/JSONParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/json/JSONParser.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/json/JSONParser.java new file mode 100644 index 0000000..58dae9e --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/json/JSONParser.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.uploads.parsers.json; + +import com.google.gson.stream.JsonReader; +import org.apache.ambari.view.hive20.client.Row; +import org.apache.ambari.view.hive20.resources.uploads.parsers.ParseOptions; +import org.apache.ambari.view.hive20.resources.uploads.parsers.Parser; +import org.apache.ambari.view.hive20.resources.uploads.parsers.RowIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Reader; +import java.util.Collection; +import java.util.Iterator; + + +/** + * Parses the input data from reader as JSON and provides iterator for rows. + * + * Expects the input reader to contains a JsonArray in which each element is a JsonObject + * corresponding to the row. + * eg. : + * + * [ + * {row1-col1, row1-col2, row1-col3}, + * {row2-col1, row2-col2, row2-col3} + * ] + * + */ +public class JSONParser extends Parser { + + protected final static Logger LOG = + LoggerFactory.getLogger(JSONParser.class); + + private RowIterator iterator; + private JsonReader jsonReader; + private JSONIterator JSONIterator; + + public JSONParser(Reader reader, ParseOptions parseOptions) throws IOException { + super(reader, parseOptions); + this.jsonReader = new JsonReader(this.reader); + JSONIterator = new JSONIterator(this.jsonReader); + iterator = new RowIterator(JSONIterator); + } + + @Override + public Row extractHeader() { + Collection<String> headers = this.iterator.extractHeaders(); + Object[] objs = new Object[headers.size()]; + Iterator<String> iterator = headers.iterator(); + for(int i = 0 ; i < headers.size() ; i++){ + objs[i] = iterator.next(); + } + + return new Row(objs); + } + + @Override + public void close() throws Exception { + this.jsonReader.close(); + } + + @Override + public Iterator<Row> iterator() { + return iterator; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/xml/XMLIterator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/xml/XMLIterator.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/xml/XMLIterator.java new file mode 100644 index 0000000..c969b69 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/xml/XMLIterator.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.uploads.parsers.xml; + +import org.apache.ambari.view.hive20.resources.uploads.parsers.EndOfDocumentException; +import org.apache.ambari.view.hive20.resources.uploads.parsers.RowMapIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLEventReader; +import javax.xml.stream.XMLStreamConstants; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.events.*; +import java.io.IOException; +import java.util.LinkedHashMap; + +/** + * assumes XML of following format + * <table> + * <row> + * <col name="col1Name">row1-col1-Data</col> + * <col name="col2Name">row1-col2-Data</col> + * <col name="col3Name">row1-col3-Data</col> + * <col name="col4Name">row1-col4-Data</col> + * </row> + * <row> + * <col name="col1Name">row2-col1-Data</col> + * <col name="col2Name">row2-col2-Data</col> + * <col name="col3Name">row2-col3-Data</col> + * <col name="col4Name">row2-col4-Data</col> + * </row> + * </table> + */ +class XMLIterator implements RowMapIterator { + + protected final static Logger LOG = + LoggerFactory.getLogger(XMLIterator.class); + + private LinkedHashMap<String, String> nextObject = null; + private static final String TAG_TABLE = "table"; + private static final String TAG_ROW = "row"; + private static final String TAG_COL = "col"; + private boolean documentStarted = false; + private XMLEventReader reader; + + public XMLIterator(XMLEventReader reader) throws IOException { + this.reader = reader; + try { + nextObject = readNextObject(this.reader); + } catch (EndOfDocumentException e) { + LOG.debug("error : {}", e); + } catch (XMLStreamException e) { + throw new IOException(e); + } + } + + @Override + public boolean hasNext() { + return null != nextObject; + } + + public LinkedHashMap<String, String> peek() { + return nextObject; + } + + @Override + public LinkedHashMap<String, String> next() { + LinkedHashMap<String, String> currObject = nextObject; + try { + nextObject = readNextObject(this.reader); + } catch (IOException e) { + LOG.error("Exception occured while reading the next row from XML : {} ", e); + nextObject = null; + } catch (EndOfDocumentException e) { + LOG.debug("End of XML document reached with next character ending the XML."); + nextObject = null; + } catch (XMLStreamException e) { + LOG.error("Exception occured while reading the next row from XML : {} ", e); + nextObject = null; + } + return currObject; + } + + @Override + public void remove() { + // no operation. + LOG.info("No operation when remove called."); + } + + private LinkedHashMap<String, String> readNextObject(XMLEventReader reader) throws IOException, EndOfDocumentException, XMLStreamException { + LinkedHashMap<String, String> row = new LinkedHashMap<>(); + boolean objectStarted = false; + String currentName = null; + + while (true) { + XMLEvent event = reader.nextEvent(); + switch (event.getEventType()) { + case XMLStreamConstants.START_ELEMENT: + StartElement startElement = event.asStartElement(); + String qName = startElement.getName().getLocalPart(); + LOG.debug("startName : {}" , qName); + switch (qName) { + case TAG_TABLE: + if (documentStarted) { + throw new IllegalArgumentException("Cannot have a <table> tag nested inside another <table> tag"); + } else { + documentStarted = true; + } + break; + case TAG_ROW: + if (objectStarted) { + throw new IllegalArgumentException("Cannot have a <row> tag nested inside another <row> tag"); + } else { + objectStarted = true; + } + break; + case TAG_COL: + if (!objectStarted) { + throw new IllegalArgumentException("Stray tag " + qName); + } + Attribute nameAttr = startElement.getAttributeByName( new QName("name")); + if( null == nameAttr ){ + throw new IllegalArgumentException("Missing name attribute in col tag."); + } + currentName = nameAttr.getValue(); + break; + default: + throw new IllegalArgumentException("Illegal start tag " + qName + " encountered."); + } + break; + case XMLStreamConstants.END_ELEMENT: + EndElement endElement = event.asEndElement(); + String name = endElement.getName().getLocalPart(); + LOG.debug("endName : {}", name); + switch (name) { + case TAG_TABLE: + if (!documentStarted) { + throw new IllegalArgumentException("Stray </table> tag."); + } + throw new EndOfDocumentException("End of XML document."); + + case TAG_ROW: + if (!objectStarted) { + throw new IllegalArgumentException("Stray </row> tag."); + } + return row; + + case TAG_COL: + if (!objectStarted) { + throw new IllegalArgumentException("Stray tag " + name); + } + currentName = null; + break; + + default: + throw new IllegalArgumentException("Illegal start ending " + name + " encountered."); + } + break; + case XMLStreamConstants.CHARACTERS: + Characters characters = event.asCharacters(); + if (characters.isWhiteSpace() && currentName == null) + break; + String data = characters.getData(); + LOG.debug("character data : {}", data); + if (currentName == null) { + throw new IllegalArgumentException("Illegal characters outside any tag : " + data); + } else { + String oldData = row.get(currentName); + if (null != oldData) { + data = oldData + data; + } + row.put(currentName, data); + } + break; + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/xml/XMLParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/xml/XMLParser.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/xml/XMLParser.java new file mode 100644 index 0000000..abcf2c0 --- /dev/null +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/uploads/parsers/xml/XMLParser.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive20.resources.uploads.parsers.xml; + +import org.apache.ambari.view.hive20.client.Row; +import org.apache.ambari.view.hive20.resources.uploads.parsers.ParseOptions; +import org.apache.ambari.view.hive20.resources.uploads.parsers.Parser; +import org.apache.ambari.view.hive20.resources.uploads.parsers.RowIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.xml.stream.XMLEventReader; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamException; +import java.io.IOException; +import java.io.Reader; +import java.util.Collection; +import java.util.Iterator; + +/** + * assumes XML of following format + * <table> + * <row> + * <col name="col1Name">row1-col1-Data</col> + * <col name="col2Name">row1-col2-Data</col> + * <col name="col3Name">row1-col3-Data</col> + * <col name="col4Name">row1-col4-Data</col> + * </row> + * <row> + * <col name="col1Name">row2-col1-Data</col> + * <col name="col2Name">row2-col2-Data</col> + * <col name="col3Name">row2-col3-Data</col> + * <col name="col4Name">row2-col4-Data</col> + * </row> + * </table> + */ +public class XMLParser extends Parser { + + protected final static Logger LOG = + LoggerFactory.getLogger(XMLParser.class); + + private RowIterator iterator; + private XMLEventReader xmlReader; + private XMLIterator xmlIterator; + + public XMLParser(Reader reader, ParseOptions parseOptions) throws IOException { + super(reader, parseOptions); + XMLInputFactory factory = XMLInputFactory.newInstance(); + try { + this.xmlReader = factory.createXMLEventReader(reader); + } catch (XMLStreamException e) { + LOG.error("error occurred while creating xml reader : ", e); + throw new IOException("error occurred while creating xml reader : ", e); + } + xmlIterator = new XMLIterator(this.xmlReader); + iterator = new RowIterator(xmlIterator); + } + + @Override + public Row extractHeader() { + Collection<String> headers = this.iterator.extractHeaders(); + Object[] objs = new Object[headers.size()]; + Iterator<String> iterator = headers.iterator(); + for (int i = 0; i < headers.size(); i++) { + objs[i] = iterator.next(); + } + + return new Row(objs); + } + + @Override + public void close() throws Exception { + try { + this.xmlReader.close(); + } catch (XMLStreamException e) { + throw new IOException(e); + } + } + + @Override + public Iterator<Row> iterator() { + return iterator; + } +}
