Repository: kudu Updated Branches: refs/heads/master 4b045c1d3 -> 5d53a3b71
kudu client tools for hadoop and spark import/export(csv,parquet,avro) Change-Id: If462af948651f3869b444e82151c3559fde19142 Reviewed-on: http://gerrit.cloudera.org:8080/7421 Reviewed-by: Jean-Daniel Cryans <jdcry...@apache.org> Tested-by: Jean-Daniel Cryans <jdcry...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5d53a3b7 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5d53a3b7 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5d53a3b7 Branch: refs/heads/master Commit: 5d53a3b7146c6bcf330ddae036cfc42eeb7b7849 Parents: 4b045c1 Author: sany <sanysand...@gmail.com> Authored: Fri Jul 14 16:54:57 2017 +0530 Committer: Jean-Daniel Cryans <jdcry...@apache.org> Committed: Wed Aug 9 22:02:13 2017 +0000 ---------------------------------------------------------------------- java/kudu-client-tools/pom.xml | 5 + .../apache/kudu/mapreduce/tools/ExportCsv.java | 109 +++++++++++ .../kudu/mapreduce/tools/ExportCsvMapper.java | 114 ++++++++++++ .../kudu/mapreduce/tools/ImportParquet.java | 180 +++++++++++++++++++ .../mapreduce/tools/ImportParquetMapper.java | 113 ++++++++++++ .../mapreduce/tools/ParquetReadSupport.java | 36 ++++ .../kudu/mapreduce/tools/ITExportCsv.java | 88 +++++++++ .../kudu/mapreduce/tools/ITImportParquet.java | 147 +++++++++++++++ .../tools/ITImportParquetPreCheck.java | 151 ++++++++++++++++ java/kudu-spark-tools/pom.xml | 8 +- .../kudu/spark/tools/ImportExportFiles.scala | 159 ++++++++++++++++ .../spark/tools/TestImportExportFiles.scala | 82 +++++++++ java/pom.xml | 15 +- 13 files changed, 1200 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/pom.xml ---------------------------------------------------------------------- diff --git a/java/kudu-client-tools/pom.xml b/java/kudu-client-tools/pom.xml index d4908fa..65ac4e3 100644 --- a/java/kudu-client-tools/pom.xml +++ b/java/kudu-client-tools/pom.xml @@ -86,6 +86,11 @@ <version>${slf4j.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + <version>${parquet.version}</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsv.java ---------------------------------------------------------------------- diff --git a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsv.java b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsv.java new file mode 100644 index 0000000..3460a50 --- /dev/null +++ b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsv.java @@ -0,0 +1,109 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.kudu.mapreduce.tools; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +import org.apache.kudu.mapreduce.CommandLineParser; +import org.apache.kudu.mapreduce.KuduTableInputFormat; +import org.apache.kudu.mapreduce.KuduTableMapReduceUtil; + +/** + * Map-only job that reads Kudu rows and writes them into a CSV file. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class ExportCsv extends Configured implements Tool { + + static final String NAME = "exportcsv"; + static final String DEFAULT_SEPARATOR = "\t"; + static final String SEPARATOR_CONF_KEY = "exportcsv.separator"; + static final String JOB_NAME_CONF_KEY = "exportcsv.job.name"; + static final String COLUMNS_NAMES_KEY = "exportcsv.column.names"; + + /** + * Sets up the actual job. + * + * @param conf the current configuration + * @param args the command line parameters + * @return the newly created job + * @throws java.io.IOException when setting up the job fails + */ + @SuppressWarnings("deprecation") + public static Job createSubmittableJob(Configuration conf, String[] args) + throws IOException, ClassNotFoundException { + + Class<ExportCsvMapper> mapperClass = ExportCsvMapper.class; + conf.set(COLUMNS_NAMES_KEY, args[0]); + String tableName = args[1]; + final Path outputDir = new Path(args[2]); + + String jobName = conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName); + Job job = new Job(conf, jobName); + job.setJarByClass(mapperClass); + job.setInputFormatClass(KuduTableInputFormat.class); + new KuduTableMapReduceUtil.TableInputFormatConfiguratorWithCommandLineParser(job, tableName, + args[0]).configure(); + job.setOutputFormatClass(TextOutputFormat.class); + job.setMapperClass(mapperClass); + job.setNumReduceTasks(0); + FileOutputFormat.setOutputPath(job, outputDir); + return job; + } + + /* + * @param errorMsg error message. can be null + */ + private static void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + String usage = "Usage: " + NAME + " <colAa,colB,colC> <table.name> <output.dir>\n\n" + + "Exports the given table and columns into the specified output path.\n" + + "The column names of the Kudu table must be specified in the form of \n" + + "comma-separated column names.\n" + + "Other options that may be specified with -D include:\n" + + "'-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" + + "-D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the" + + "export.\n" + CommandLineParser.getHelpSnippet(); + System.err.println(usage); + } + + @Override + public int run(String[] otherArgs) throws Exception { + if (otherArgs.length < 3) { + usage("Wrong number of arguments: " + otherArgs.length); + return -1; + } + Job job = createSubmittableJob(getConf(), otherArgs); + return job.waitForCompletion(true) ? 0 : 1; + } + + public static void main(String[] args) throws Exception { + int status = ToolRunner.run(new ExportCsv(), args); + System.exit(status); + } +} http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsvMapper.java ---------------------------------------------------------------------- diff --git a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsvMapper.java b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsvMapper.java new file mode 100644 index 0000000..bbe855c --- /dev/null +++ b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsvMapper.java @@ -0,0 +1,114 @@ +/** + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.kudu.mapreduce.tools; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.client.Bytes; +import org.apache.kudu.client.RowResult; + +/** + * Mapper that ingests Kudu rows and turns them into CSV lines. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ExportCsvMapper extends Mapper<NullWritable, RowResult, NullWritable,Text> { + + private static final NullWritable NULL_KEY = NullWritable.get(); + + /** Column seperator */ + private String separator; + + private Schema schema; + + /** + * Handles initializing this class with objects specific to it. + */ + @Override + protected void setup(Context context) { + Configuration conf = context.getConfiguration(); + this.separator = conf.get(ExportCsv.SEPARATOR_CONF_KEY, ExportCsv.DEFAULT_SEPARATOR); + } + + /** + * Converts Kudu RowResult into a line of CSV text. + */ + @Override + public void map(NullWritable key, RowResult value, Context context) throws IOException { + this.schema = value.getSchema(); + try { + context.write(NULL_KEY, new Text(rowResultToString(value))); + } catch (InterruptedException e) { + throw new IOException("Failing task since it was interrupted", e); + } + } + + private String rowResultToString(RowResult value) { + StringBuilder buf = new StringBuilder(); + for (int i = 0; i < schema.getColumnCount(); i++) { + ColumnSchema col = schema.getColumnByIndex(i); + if (i != 0) { + buf.append(this.separator); + } + + switch (col.getType()) { + case INT8: + buf.append(value.getByte(i)); + break; + case INT16: + buf.append(value.getShort(i)); + break; + case INT32: + buf.append(value.getInt(i)); + break; + case INT64: + buf.append(value.getLong(i)); + break; + case STRING: + buf.append(value.getString(i)); + break; + case BINARY: + buf.append(Bytes.pretty(value.getBinaryCopy(i))); + break; + case FLOAT: + buf.append(value.getFloat(i)); + break; + case DOUBLE: + buf.append(value.getDouble(i)); + break; + case BOOL: + buf.append(value.getBoolean(i)); + break; + case UNIXTIME_MICROS: + buf.append(value.getLong(i)); + break; + default: + buf.append("<unknown type!>"); + break; + } + } + return buf.toString(); + } +} http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquet.java ---------------------------------------------------------------------- diff --git a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquet.java b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquet.java new file mode 100644 index 0000000..eff500d --- /dev/null +++ b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquet.java @@ -0,0 +1,180 @@ +/** + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.kudu.mapreduce.tools; + +import static java.sql.Types.TIMESTAMP; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.hadoop.Footer; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Type; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.mapreduce.CommandLineParser; +import org.apache.kudu.mapreduce.KuduTableMapReduceUtil; + +/** + * Map-only job that reads Apache Parquet files and inserts them into a single Kudu table. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class ImportParquet extends Configured implements Tool { + + static final String NAME = "importparquet"; + static final String JOB_NAME_CONF_KEY = "importparquet.job.name"; + static final String PARQUET_INPUT_SCHEMA = "importparquet.input.schema"; + + /** + * Sets up the actual job. + * + * @param conf the current configuration + * @param args the command line parameters + * @return the newly created job + * @throws java.io.IOException when setting up the job fails + */ + @SuppressWarnings("deprecation") + public static Job createSubmittableJob(Configuration conf, String[] args) + throws IOException, ClassNotFoundException { + + final String tableName = args[0]; + Path inputDir = new Path(args[1]); + + List<Footer> footers = new ArrayList<Footer>(); + footers.addAll(ParquetFileReader.readFooters(conf, inputDir)); + + MessageType schema = footers.get(0).getParquetMetadata().getFileMetaData().getSchema(); + GroupWriteSupport.setSchema(schema, conf); + conf.set(PARQUET_INPUT_SCHEMA, schema.toString()); + + String jobName = conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName); + Job job = new Job(conf,jobName); + job.setJarByClass(ImportParquet.class); + job.setMapperClass(ImportParquetMapper.class); + job.setNumReduceTasks(0); + job.setInputFormatClass(ParquetInputFormat.class); + ParquetInputFormat.setReadSupportClass(job, ParquetReadSupport.class); + ParquetInputFormat.setInputPaths(job, inputDir); + + CommandLineParser cmdLineParser = new CommandLineParser(conf); + KuduClient client = cmdLineParser.getClient(); + KuduTable table = client.openTable(tableName); + + + // Pre-flight checks of input parquet schema and table schema. + for (ColumnSchema columnSchema : table.getSchema().getColumns()) { + if (schema.containsField(columnSchema.getName())) { + if (!schema.getType(columnSchema.getName()).asPrimitiveType().getPrimitiveTypeName() + .equals(getTypeName(columnSchema.getType()))) { + throw new IllegalArgumentException("The column type " + + getTypeName(columnSchema.getType()) + " does not exist in Parquet schema"); + } + } else { + throw new IllegalArgumentException("The column " + columnSchema.getName() + + " does not exist in Parquet schema"); + } + } + // Kudu doesn't support Parquet's TIMESTAMP. + Iterator<ColumnDescriptor> fields = schema.getColumns().iterator(); + while (fields.hasNext()) { + if (fields.next().getType().equals(TIMESTAMP)) { + throw new IllegalArgumentException("This " + fields.next().getType() + + " Parquet type is not supported in Kudu"); + } + } + + FileInputFormat.setInputPaths(job, inputDir); + new KuduTableMapReduceUtil.TableOutputFormatConfiguratorWithCommandLineParser( + job, + tableName) + .configure(); + return job; + } + + private static PrimitiveType.PrimitiveTypeName getTypeName(Type type) { + switch (type) { + case BOOL: + return PrimitiveType.PrimitiveTypeName.BOOLEAN; + case INT8: + return PrimitiveType.PrimitiveTypeName.INT32; + case INT16: + return PrimitiveType.PrimitiveTypeName.INT64; + case INT32: + return PrimitiveType.PrimitiveTypeName.INT32; + case INT64: + return PrimitiveType.PrimitiveTypeName.INT64; + case STRING: + return PrimitiveType.PrimitiveTypeName.BINARY; + case FLOAT: + return PrimitiveType.PrimitiveTypeName.FLOAT; + case DOUBLE: + return PrimitiveType.PrimitiveTypeName.DOUBLE; + default: + throw new IllegalArgumentException("Type " + type.getName() + " not recognized"); + } + } + + /* + * @param errorMsg error message. can be null + */ + private static void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + String usage = + "Usage: " + NAME + "<table.name> <input.dir>\n\n" + + "Imports the given input directory of Apache Parquet data into the specified table.\n" + + "Other options that may be specified with -D include:\n" + + "-D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the" + + "import.\n" + CommandLineParser.getHelpSnippet(); + + System.err.println(usage); + } + + @Override + public int run(String[] otherArgs) throws Exception { + if (otherArgs.length < 1) { + usage("Wrong number of arguments: " + otherArgs.length); + return -1; + } + Job job = createSubmittableJob(getConf(), otherArgs); + return job.waitForCompletion(true) ? 0 : 1; + } + + public static void main(String[] args) throws Exception { + int status = ToolRunner.run(new ImportParquet(), args); + System.exit(status); + } +} http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquetMapper.java ---------------------------------------------------------------------- diff --git a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquetMapper.java b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquetMapper.java new file mode 100644 index 0000000..bf40442 --- /dev/null +++ b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquetMapper.java @@ -0,0 +1,113 @@ +/** + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.kudu.mapreduce.tools; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.client.Insert; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.PartialRow; +import org.apache.kudu.mapreduce.KuduTableMapReduceUtil; + +/** + * Mapper that ingests Apache Parquet lines and turns them into Kudu Inserts. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ImportParquetMapper extends Mapper<LongWritable, Group, NullWritable, Operation> { + + private static final NullWritable NULL_KEY = NullWritable.get(); + + private MessageType parquetSchema; + + private KuduTable table; + private Schema schema; + + /** + * Handles initializing this class with objects specific to it (i.e., the parser). + */ + @Override + protected void setup(Context context) { + Configuration conf = context.getConfiguration(); + parquetSchema = MessageTypeParser.parseMessageType(conf.get( + ImportParquet.PARQUET_INPUT_SCHEMA)); + + this.table = KuduTableMapReduceUtil.getTableFromContext(context); + this.schema = this.table.getSchema(); + } + + /** + * Convert a line of Parquet data into a Kudu Insert + */ + @Override + public void map(LongWritable key, Group value, Context context) + throws IOException { + + try { + Insert insert = this.table.newInsert(); + PartialRow row = insert.getRow(); + for (int i = 0; i < parquetSchema.getFields().size(); i++) { + String colName = parquetSchema.getFields().get(i).getName(); + ColumnSchema col = this.schema.getColumn(colName); + String colValue = value.getValueToString(i, 0); + switch (col.getType()) { + case BOOL: + row.addBoolean(colName, Boolean.parseBoolean(colValue)); + break; + case INT8: + row.addByte(colName, Byte.parseByte(colValue)); + break; + case INT16: + row.addShort(colName, Short.parseShort(colValue)); + break; + case INT32: + row.addInt(colName, Integer.parseInt(colValue)); + break; + case INT64: + row.addLong(colName, Long.parseLong(colValue)); + break; + case STRING: + row.addString(colName, colValue); + break; + case FLOAT: + row.addFloat(colName, Float.parseFloat(colValue)); + break; + case DOUBLE: + row.addDouble(colName, Double.parseDouble(colValue)); + break; + default: + throw new IllegalArgumentException("Type " + col.getType() + " not recognized"); + } + } + context.write(NULL_KEY, insert); + } catch (InterruptedException e) { + throw new IOException("Failing task since it was interrupted", e); + } + } +} http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ParquetReadSupport.java ---------------------------------------------------------------------- diff --git a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ParquetReadSupport.java b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ParquetReadSupport.java new file mode 100644 index 0000000..6762f99 --- /dev/null +++ b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ParquetReadSupport.java @@ -0,0 +1,36 @@ +/** + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ + +package org.apache.kudu.mapreduce.tools; + +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.api.DelegatingReadSupport; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.example.GroupReadSupport; + +/** + * Read support for Apache Parquet. + */ +public final class ParquetReadSupport extends DelegatingReadSupport<Group> { + + public ParquetReadSupport() { + super(new GroupReadSupport()); + } + + @Override + public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init(InitContext context) { + return super.init(context); + } +} http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITExportCsv.java ---------------------------------------------------------------------- diff --git a/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITExportCsv.java b/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITExportCsv.java new file mode 100644 index 0000000..984ec63 --- /dev/null +++ b/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITExportCsv.java @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.mapreduce.tools; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.GenericOptionsParser; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.kudu.Schema; +import org.apache.kudu.client.BaseKuduTest; +import org.apache.kudu.mapreduce.CommandLineParser; +import org.apache.kudu.mapreduce.HadoopTestingUtility; + +public class ITExportCsv extends BaseKuduTest { + + private static final String TABLE_NAME = + ITExportCsv.class.getName() + "-" + System.currentTimeMillis(); + + private static final HadoopTestingUtility HADOOP_UTIL = new HadoopTestingUtility(); + + private static Schema schema; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + BaseKuduTest.setUpBeforeClass(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + try { + BaseKuduTest.tearDownAfterClass(); + } finally { + HADOOP_UTIL.cleanup(); + } + } + + @Test + public void test() throws Exception { + Configuration conf = new Configuration(); + String testHome = + HADOOP_UTIL.setupAndGetTestDir(ITExportCsv.class.getName(), conf).getAbsolutePath(); + + // create a table with on empty tablet and 3 tablets of 3 rows each. + createFourTabletsTableWithNineRows(TABLE_NAME); + String[] args = new String[] { + "-D" + CommandLineParser.MASTER_ADDRESSES_KEY + "=" + getMasterAddresses(), + "*", TABLE_NAME, testHome + "/exportdata"}; + + GenericOptionsParser parser = new GenericOptionsParser(conf, args); + Job job = ExportCsv.createSubmittableJob(parser.getConfiguration(), parser.getRemainingArgs()); + assertTrue("Test job did not end properly", job.waitForCompletion(true)); + + String csvContent = readCsvFile(new File(testHome + "/exportdata/part-m-00001")); + assertEquals(csvContent.split("\n").length,3); + assertEquals(csvContent.split("\n")[0].split("\t")[3],"a string"); + } + + private String readCsvFile(File data) throws IOException { + FileInputStream fos = new FileInputStream(data); + return IOUtils.toString(fos, "UTF-8"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquet.java ---------------------------------------------------------------------- diff --git a/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquet.java b/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquet.java new file mode 100644 index 0000000..0761a75 --- /dev/null +++ b/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquet.java @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.mapreduce.tools; + +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; + +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.kudu.client.KuduTable; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.schema.MessageType; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.BaseKuduTest; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.mapreduce.CommandLineParser; +import org.apache.kudu.mapreduce.HadoopTestingUtility; + +public class ITImportParquet extends BaseKuduTest { + + private static final String TABLE_NAME = + ITImportParquet.class.getName() + "-" + System.currentTimeMillis(); + + private static final HadoopTestingUtility HADOOP_UTIL = new HadoopTestingUtility(); + + private static Schema schema; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + BaseKuduTest.setUpBeforeClass(); + + ArrayList<ColumnSchema> columns = new ArrayList<ColumnSchema>(4); + columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32) + .key(true) + .build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("column1_i", Type.INT32) + .build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("column2_d", Type.DOUBLE) + .build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("column3_s", Type.STRING) + .nullable(true) + .build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("column4_b", Type.BOOL) + .build()); + schema = new Schema(columns); + + createTable(TABLE_NAME, schema, + new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key"))); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + try { + BaseKuduTest.tearDownAfterClass(); + } finally { + HADOOP_UTIL.cleanup(); + } + } + + @Test + public void test() throws Exception { + Configuration conf = new Configuration(); + String testHome = + HADOOP_UTIL.setupAndGetTestDir(ITImportCsv.class.getName(), conf).getAbsolutePath(); + + // Create a 4 records parquet input file. + Path data = new Path(testHome, "data.parquet"); + writeParquetFile(data,conf); + + StringBuilder sb = new StringBuilder(); + for (ColumnSchema col : schema.getColumns()) { + sb.append(col.getName()); + sb.append(","); + } + sb.deleteCharAt(sb.length() - 1); + String[] args = new String[] { "-D" + CommandLineParser.MASTER_ADDRESSES_KEY + "=" + getMasterAddresses(), + TABLE_NAME, data.toString()}; + + GenericOptionsParser parser = new GenericOptionsParser(conf, args); + Job job = ImportParquet.createSubmittableJob(parser.getConfiguration(), parser.getRemainingArgs()); + assertTrue("Test job did not end properly", job.waitForCompletion(true)); + + KuduTable openTable = openTable(TABLE_NAME); + assertEquals(4, countRowsInScan( + client.newScannerBuilder(openTable).build())); + assertEquals("INT32 key=1, INT32 column1_i=3, DOUBLE column2_d=2.3, STRING column3_s=some string, " + + "BOOL column4_b=true",scanTableToStrings(openTable).get(0)); + } + + private void writeParquetFile(Path data,Configuration conf) throws IOException { + MessageType schema = parseMessageType( + "message test { " + + "required int32 key; " + + "required int32 column1_i; " + + "required double column2_d; " + + "required binary column3_s; " + + "required boolean column4_b; " + + "} "); + GroupWriteSupport.setSchema(schema, conf); + SimpleGroupFactory f = new SimpleGroupFactory(schema); + ParquetWriter<Group> writer = new ParquetWriter<Group>(data, new GroupWriteSupport(), + UNCOMPRESSED, 1024, 1024, 512, true, false, ParquetProperties.WriterVersion.PARQUET_1_0, conf); + + writer.write(f.newGroup().append("key", 1).append("column1_i", 3).append("column2_d", 2.3) + .append("column3_s", "some string").append("column4_b", true)); + writer.write(f.newGroup().append("key", 2).append("column1_i", 5).append("column2_d", 4.5) + .append("column3_s", "some more").append("column4_b", false)); + writer.write(f.newGroup().append("key", 3).append("column1_i", 7).append("column2_d", 5.6) + .append("column3_s", "some more and more").append("column4_b", true)); + writer.write(f.newGroup().append("key", 4).append("column1_i", 9).append("column2_d",10.9) + .append("column3_s", "some more and alst").append("column4_b", false)); + writer.close(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquetPreCheck.java ---------------------------------------------------------------------- diff --git a/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquetPreCheck.java b/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquetPreCheck.java new file mode 100644 index 0000000..ab332ed --- /dev/null +++ b/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquetPreCheck.java @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.mapreduce.tools; + +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; + +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.kudu.client.KuduTable; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.schema.MessageType; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.BaseKuduTest; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.mapreduce.CommandLineParser; +import org.apache.kudu.mapreduce.HadoopTestingUtility; + +public class ITImportParquetPreCheck extends BaseKuduTest { + + private static final String TABLE_NAME = + ITImportParquet.class.getName() + "-" + System.currentTimeMillis(); + + private static final HadoopTestingUtility HADOOP_UTIL = new HadoopTestingUtility(); + + private static Schema schema; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + BaseKuduTest.setUpBeforeClass(); + + ArrayList<ColumnSchema> columns = new ArrayList<ColumnSchema>(4); + columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32) + .key(true) + .build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("column1_i", Type.INT32) + .build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("column2_d", Type.DOUBLE) + .build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("column3_s", Type.STRING) + .nullable(true) + .build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("column4_b", Type.BOOL) + .build()); + schema = new Schema(columns); + + createTable(TABLE_NAME, schema, + new CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key"))); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + try { + BaseKuduTest.tearDownAfterClass(); + } finally { + HADOOP_UTIL.cleanup(); + } + } + + @Test + public void test() throws Exception { + Configuration conf = new Configuration(); + String testHome = + HADOOP_UTIL.setupAndGetTestDir(ITImportCsv.class.getName(), conf).getAbsolutePath(); + + // Create a 4 records parquet input file. + Path data = new Path(testHome, "data.parquet"); + writeParquetFile(data,conf); + + StringBuilder sb = new StringBuilder(); + for (ColumnSchema col : schema.getColumns()) { + sb.append(col.getName()); + sb.append(","); + } + sb.deleteCharAt(sb.length() - 1); + String[] args = new String[] { "-D" + CommandLineParser.MASTER_ADDRESSES_KEY + "=" + + getMasterAddresses(), TABLE_NAME, data.toString()}; + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("The column column1_i does not exist in Parquet schema"); + + GenericOptionsParser parser = new GenericOptionsParser(conf, args); + Job job = ImportParquet.createSubmittableJob(parser.getConfiguration(), parser.getRemainingArgs()); + job.waitForCompletion(true); + + KuduTable openTable = openTable(TABLE_NAME); + assertEquals(0, countRowsInScan(client.newScannerBuilder(openTable).build())); + } + + private void writeParquetFile(Path data,Configuration conf) throws IOException { + MessageType schema = parseMessageType( + "message test { " + + "required int32 key; " + + "required int32 column1_i_s; " + + "required binary column2_d; " + + "required binary column3_s; " + + "required boolean column4_b; " + + "} "); + GroupWriteSupport.setSchema(schema, conf); + SimpleGroupFactory f = new SimpleGroupFactory(schema); + ParquetWriter<Group> writer = new ParquetWriter<Group>(data, new GroupWriteSupport(), + UNCOMPRESSED, 1024, 1024, 512, true, false, ParquetProperties.WriterVersion.PARQUET_1_0, conf); + + writer.write(f.newGroup().append("key", 1).append("column1_i_s", 292).append("column2_d", "no type") + .append("column3_s", "some string").append("column4_b", true)); + writer.write(f.newGroup().append("key", 2).append("column1_i_s", 23).append("column2_d", "no type") + .append("column3_s", "some more").append("column4_b", false)); + writer.write(f.newGroup().append("key", 3).append("column1_i_s", 32).append("column2_d", "no type") + .append("column3_s", "some more and more").append("column4_b", true)); + writer.write(f.newGroup().append("key", 4).append("column1_i_s", 22).append("column2_d", "no type") + .append("column3_s", "some more and alst").append("column4_b", false)); + writer.close(); + } +} http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-spark-tools/pom.xml ---------------------------------------------------------------------- diff --git a/java/kudu-spark-tools/pom.xml b/java/kudu-spark-tools/pom.xml index c2eb57f..98ffe28 100644 --- a/java/kudu-spark-tools/pom.xml +++ b/java/kudu-spark-tools/pom.xml @@ -18,7 +18,8 @@ // specific language governing permissions and limitations // under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.kudu</groupId> @@ -98,6 +99,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>com.databricks</groupId> + <artifactId>spark-avro_2.10</artifactId> + <version>${sparkavro.version}</version> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala new file mode 100644 index 0000000..bc2f0a3 --- /dev/null +++ b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala @@ -0,0 +1,159 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.spark.tools + +import org.apache.kudu.client.KuduClient +import org.apache.kudu.spark.tools.ImportExportKudu.ArgsCls +import org.apache.spark.sql.SQLContext +import org.apache.spark.{SparkConf, SparkContext} +import org.slf4j.{Logger, LoggerFactory} +import org.apache.kudu.spark.kudu._ +import com.databricks.spark.avro +import com.google.common.annotations.VisibleForTesting + +object ImportExportKudu { + val LOG: Logger = LoggerFactory.getLogger(ImportExportKudu.getClass) + + def fail(msg: String): Nothing = { + System.err.println(msg) + sys.exit(1) + } + + def usage: String = + s""" + | Usage: --operation=import/export --format=<data-format(csv,parquet,avro)> --master-addrs=<master-addrs> --path=<path> --table-name=<table-name> + | where + | operation: import or export data from or to Kudu tables, default: import + | format: specify the format of data want to import/export, the following formats are supported csv,parquet,avro default:csv + | masterAddrs: comma separated addresses of Kudu master nodes, default: localhost + | path: path to input or output for import/export operation, default: file:// + | tableName: table name to import/export, default: "" + | columns: columns name for select statement on export from kudu table, default: * + | delimiter: delimiter for csv import/export, default: , + | header: header for csv import/export, default:false + """.stripMargin + + case class ArgsCls(operation: String = "import", + format: String = "csv", + masterAddrs: String = "localhost", + path: String = "file://", + tableName: String = "", + columns: String = "*", + delimiter: String = ",", + header: String = "false", + inferschema: String="false" + ) + + object ArgsCls { + private def parseInner(options: ArgsCls, args: List[String]): ArgsCls = { + LOG.info(args.mkString(",")) + args match { + case Nil => options + case "--help" :: _ => + System.err.println(usage) + sys.exit(0) + case flag :: Nil => fail(s"flag $flag has no value\n$usage") + case flag :: value :: tail => + val newOptions: ArgsCls = flag match { + case "--operation" => options.copy(operation = value) + case "--format" => options.copy(format = value) + case "--master-addrs" => options.copy(masterAddrs = value) + case "--path" => options.copy(path = value) + case "--table-name" => options.copy(tableName = value) + case "--columns" => options.copy(columns = value) + case "--delimiter" => options.copy(delimiter = value) + case "--header" => options.copy(header = value) + case "--inferschema" => options.copy(inferschema = value) + case _ => fail(s"unknown argument given $flag") + } + parseInner(newOptions, tail) + } + } + + def parse(args: Array[String]): ArgsCls = { + parseInner(ArgsCls(), args.flatMap(_.split('=')).toList) + } + } +} + +object ImportExportFiles { + + import ImportExportKudu.{LOG, fail} + + var sqlContext: SQLContext = _ + var kuduOptions: Map[String, String] = _ + + def run(args: ArgsCls, sc: SparkContext, sqlContext: SQLContext): Unit = { + val kc = new KuduContext(args.masterAddrs, sc) + val applicationId = sc.applicationId + + val client: KuduClient = kc.syncClient + if (!client.tableExists(args.tableName)) { + fail(args.tableName + s" table doesn't exist") + } + + kuduOptions = Map( + "kudu.table" -> args.tableName, + "kudu.master" -> args.masterAddrs) + + args.operation match { + case "import" => + args.format match { + case "csv" => + val df = sqlContext.read.option("header", args.header).option("delimiter", args.delimiter).csv(args.path) + kc.upsertRows(df, args.tableName) + case "parquet" => + val df = sqlContext.read.parquet(args.path) + kc.upsertRows(df, args.tableName) + case "avro" => + val df = sqlContext.read.format("com.databricks.spark.avro").load(args.path) + kc.upsertRows(df, args.tableName) + case _ => fail(args.format + s"unknown argument given ") + } + case "export" => + val df = sqlContext.read.options(kuduOptions).kudu.select(args.columns); + args.format match { + case "csv" => + df.write.format("com.databricks.spark.csv").option("header", args.header).option("delimiter", + args.delimiter).save(args.path) + case "parquet" => + df.write.parquet(args.path) + case "avro" => + df.write.format("com.databricks.spark.avro").save(args.path) + case _ => fail(args.format + s"unknown argument given ") + } + case _ => fail(args.operation + s"unknown argument given ") + } + } + /** + * Entry point for testing. SparkContext is a singleton, + * so tests must create and manage their own. + */ + @VisibleForTesting + def testMain(args: Array[String], sc: SparkContext): Unit = { + sqlContext = new SQLContext(sc) + run(ArgsCls.parse(args), sc, sqlContext) + } + + def main(args: Array[String]): Unit = { + val conf = new SparkConf().setAppName("Import or Export CSV files from/to Kudu ") + val sc = new SparkContext(conf) + testMain(args, sc) + } +} + http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala new file mode 100644 index 0000000..2507853 --- /dev/null +++ b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kudu.spark.tools + +import java.io.{File, FileOutputStream} + +import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder +import org.apache.kudu.{Schema, Type} +import org.apache.kudu.client.CreateTableOptions +import org.apache.kudu.spark.kudu._ +import org.apache.spark.sql.SQLContext +import org.junit.Assert._ +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{FunSuite, Matchers} +import org.spark_project.guava.collect.ImmutableList + +import scala.collection.JavaConverters._ + +@RunWith(classOf[JUnitRunner]) +class TestImportExportFiles extends FunSuite with TestContext with Matchers { + + private val TABLE_NAME: String = classOf[TestImportExportFiles].getName + "-" + System.currentTimeMillis + var sqlContext : SQLContext = _ + var kuduOptions : Map[String, String] = _ + + test("Spark Import Export") { + val schema: Schema = { + val columns = ImmutableList.of( + new ColumnSchemaBuilder("key", Type.STRING).key(true).build(), + new ColumnSchemaBuilder("column1_i", Type.STRING).build(), + new ColumnSchemaBuilder("column2_d", Type.STRING).nullable(true).build(), + new ColumnSchemaBuilder("column3_s", Type.STRING).build(), + new ColumnSchemaBuilder("column4_b", Type.STRING).build()) + new Schema(columns) + } + val tableOptions = new CreateTableOptions().setRangePartitionColumns(List("key").asJava) + .setNumReplicas(1) + kuduClient.createTable(TABLE_NAME, schema, tableOptions) + + val data: File = new File("target/", TABLE_NAME+".csv") + writeCsvFile(data) + + ImportExportFiles.testMain(Array("--operation=import", + "--format=csv", + s"--master-addrs=${miniCluster.getMasterAddresses}", + s"--path=${"target/"+TABLE_NAME+".csv"}", + s"--table-name=${TABLE_NAME}", + "--delimiter=,", + "--header=true", + "--inferschema=true"), sc) + val rdd = kuduContext.kuduRDD(sc, TABLE_NAME, List("key")) + assert(rdd.collect.length == 4) + assertEquals(rdd.collect().mkString(","),"[1],[2],[3],[4]") + } + + def writeCsvFile(data: File) + { + val fos: FileOutputStream = new FileOutputStream(data) + fos.write("key,column1_i,column2_d,column3_s,column4_b\n".getBytes) + fos.write("1,3,2.3,some string,true\n".getBytes) + fos.write("2,5,4.5,some more,false\n".getBytes) + fos.write("3,7,1.2,wait this is not a double bad row,true\n".getBytes) + fos.write("4,9,10.1,trailing separator isn't bad mkay?,true\n".getBytes) + fos.close() + } +} http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/pom.xml ---------------------------------------------------------------------- diff --git a/java/pom.xml b/java/pom.xml index 5893735..fed039a 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -29,10 +29,10 @@ <!-- inherit from the ASF POM for distribution management --> <parent> - <groupId>org.apache</groupId> - <artifactId>apache</artifactId> - <version>18</version> - <relativePath/> + <groupId>org.apache</groupId> + <artifactId>apache</artifactId> + <version>18</version> + <relativePath/> </parent> <name>Kudu</name> @@ -72,6 +72,8 @@ <protobuf.version>3.3.0</protobuf.version> <slf4j.version>1.7.25</slf4j.version> <yetus.version>0.4.0</yetus.version> + <parquet.version>1.9.0</parquet.version> + <sparkavro.version>3.2.0</sparkavro.version> <!-- Scala Library dependencies --> <spark1.version>1.6.3</spark1.version> @@ -83,8 +85,9 @@ <!-- Misc variables --> <testdata.dir>target/testdata</testdata.dir> <testArgLine>-enableassertions -Xmx1900m - -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true - -Djava.awt.headless=true</testArgLine> + -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true + -Djava.awt.headless=true + </testArgLine> </properties> <modules>