swaroopak commented on a change in pull request #498: PHOENIX-5258 Add support
for parsing CSV header as columns
URL: https://github.com/apache/phoenix/pull/498#discussion_r280538502
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
##########
@@ -96,6 +118,86 @@ protected void configureOptions(CommandLine cmdLine,
List<ColumnInfo> importColu
binaryEncoding);
}
+ /**
+ * Build up the list of columns to be imported. The list is taken from the
command line if
+ * present, otherwise it is taken from the table description.
+ *
+ * @param conn connection to Phoenix
+ * @param cmdLine supplied command line options
+ * @param qualifiedTableName table name (possibly with schema) of the
table to be imported
+ * @param conf Configured options
+ * @return the list of columns to be imported
+ */
+ @Override
+ List<ColumnInfo> buildImportColumns(
+ Connection conn, CommandLine cmdLine, String qualifiedTableName,
Configuration conf
+ ) throws SQLException, IOException {
+ List<ColumnInfo> columnInfos;
+ if (cmdLine.hasOption(HEADER_OPT.getOpt())) {
+ List<String> parsedColumns = parseCsvHeaders(cmdLine, conf);
+ columnInfos = SchemaUtil.generateColumnInfo(
+ conn, qualifiedTableName, parsedColumns, true);
+ } else {
+ columnInfos = super.buildImportColumns(conn, cmdLine,
qualifiedTableName, conf);
+ }
+ return columnInfos;
+ }
+
+ /**
+ * Parse the header (first line) from the input CSV and return the
ArrayList of input columns
+ * @param cmdLine Supplied commandline options
+ * @param conf Configured options
+ * @return the list of columns to be imported parsed from input CSV header
+ * @throws IOException Exception thrown by FileSystem IO.
+ */
+ private List<String> parseCsvHeaders(CommandLine cmdLine, Configuration
conf) throws IOException {
+ List<String> headerColumns;
+ String inputPaths = cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt());
+ Iterable<String> paths =
Splitter.on(",").trimResults().split(inputPaths);
+ List<String> headers = fetchAllHeaders(paths, conf);
+ List<String> uniqueHeaders =
headers.stream().distinct().collect(Collectors.toList());
+ if (uniqueHeaders.size() > 1) {
+ throw new IllegalArgumentException(
+ "Headers in provided input files are different. Headers must
be unique for all input files"
+ );
+ }
+ String header = uniqueHeaders.get(0);
+ headerColumns =
Lists.newArrayList(Splitter.on(",").trimResults().split(header));
+ headerColumns.replaceAll(String::toUpperCase);
+ return headerColumns;
+ }
+
+ /**
+ * Fetch the headers from all comma separated input files provided by user.
+ * @param paths Iterable instance of the provided input paths
+ * @param conf Configured options
+ * @return The list of headers from all input files.
+ * @throws IOException Exception thrown by FileSystem IO
+ */
+ private List<String> fetchAllHeaders(Iterable<String> paths, Configuration
conf) throws IOException {
+ List<String> headers = new ArrayList<>();
+ for (String path : paths) {
+ headers.add(fetchCsvHeader(conf, path));
+ }
+ return headers;
+ }
+
+ /**
+ * Fetch CSV header (first line) from given HDFS path
+ * @param conf Configured Options
+ * @param path HDFS Path to single input file
+ * @return The header line (first line) from the input file
+ * @throws IOException Exception thrown by FileSystem IO
+ */
+ private String fetchCsvHeader(Configuration conf, String path) throws
IOException {
+ FileSystem fs = FileSystem.get(URI.create(path), conf);
Review comment:
Can we use try-with-resources here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services