Repository: kudu
Updated Branches:
  refs/heads/master 4b045c1d3 -> 5d53a3b71


kudu client tools for hadoop and spark import/export(csv,parquet,avro)

Change-Id: If462af948651f3869b444e82151c3559fde19142
Reviewed-on: http://gerrit.cloudera.org:8080/7421
Reviewed-by: Jean-Daniel Cryans <jdcry...@apache.org>
Tested-by: Jean-Daniel Cryans <jdcry...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5d53a3b7
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5d53a3b7
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5d53a3b7

Branch: refs/heads/master
Commit: 5d53a3b7146c6bcf330ddae036cfc42eeb7b7849
Parents: 4b045c1
Author: sany <sanysand...@gmail.com>
Authored: Fri Jul 14 16:54:57 2017 +0530
Committer: Jean-Daniel Cryans <jdcry...@apache.org>
Committed: Wed Aug 9 22:02:13 2017 +0000

----------------------------------------------------------------------
 java/kudu-client-tools/pom.xml                  |   5 +
 .../apache/kudu/mapreduce/tools/ExportCsv.java  | 109 +++++++++++
 .../kudu/mapreduce/tools/ExportCsvMapper.java   | 114 ++++++++++++
 .../kudu/mapreduce/tools/ImportParquet.java     | 180 +++++++++++++++++++
 .../mapreduce/tools/ImportParquetMapper.java    | 113 ++++++++++++
 .../mapreduce/tools/ParquetReadSupport.java     |  36 ++++
 .../kudu/mapreduce/tools/ITExportCsv.java       |  88 +++++++++
 .../kudu/mapreduce/tools/ITImportParquet.java   | 147 +++++++++++++++
 .../tools/ITImportParquetPreCheck.java          | 151 ++++++++++++++++
 java/kudu-spark-tools/pom.xml                   |   8 +-
 .../kudu/spark/tools/ImportExportFiles.scala    | 159 ++++++++++++++++
 .../spark/tools/TestImportExportFiles.scala     |  82 +++++++++
 java/pom.xml                                    |  15 +-
 13 files changed, 1200 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/pom.xml
----------------------------------------------------------------------
diff --git a/java/kudu-client-tools/pom.xml b/java/kudu-client-tools/pom.xml
index d4908fa..65ac4e3 100644
--- a/java/kudu-client-tools/pom.xml
+++ b/java/kudu-client-tools/pom.xml
@@ -86,6 +86,11 @@
             <version>${slf4j.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-hadoop</artifactId>
+            <version>${parquet.version}</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsv.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsv.java
 
b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsv.java
new file mode 100644
index 0000000..3460a50
--- /dev/null
+++ 
b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsv.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.kudu.mapreduce.tools;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.kudu.mapreduce.CommandLineParser;
+import org.apache.kudu.mapreduce.KuduTableInputFormat;
+import org.apache.kudu.mapreduce.KuduTableMapReduceUtil;
+
+/**
+ * Map-only job that reads Kudu rows and writes them into a CSV file.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class ExportCsv extends Configured implements Tool {
+
+  static final String NAME = "exportcsv";
+  static final String DEFAULT_SEPARATOR = "\t";
+  static final String SEPARATOR_CONF_KEY = "exportcsv.separator";
+  static final String JOB_NAME_CONF_KEY = "exportcsv.job.name";
+  static final String COLUMNS_NAMES_KEY = "exportcsv.column.names";
+
+  /**
+   * Sets up the actual job.
+   *
+   * @param conf the current configuration
+   * @param args the command line parameters
+   * @return the newly created job
+   * @throws java.io.IOException when setting up the job fails
+   */
+  @SuppressWarnings("deprecation")
+  public static Job createSubmittableJob(Configuration conf, String[] args)
+          throws IOException, ClassNotFoundException {
+
+    Class<ExportCsvMapper> mapperClass = ExportCsvMapper.class;
+    conf.set(COLUMNS_NAMES_KEY, args[0]);
+    String tableName = args[1];
+    final Path outputDir = new Path(args[2]);
+
+    String jobName = conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName);
+    Job job = new Job(conf, jobName);
+    job.setJarByClass(mapperClass);
+    job.setInputFormatClass(KuduTableInputFormat.class);
+    new 
KuduTableMapReduceUtil.TableInputFormatConfiguratorWithCommandLineParser(job, 
tableName,
+      args[0]).configure();
+    job.setOutputFormatClass(TextOutputFormat.class);
+    job.setMapperClass(mapperClass);
+    job.setNumReduceTasks(0);
+    FileOutputFormat.setOutputPath(job, outputDir);
+    return job;
+  }
+
+  /*
+   * @param errorMsg error message. can be null
+   */
+  private static void usage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+    }
+    String usage = "Usage: " + NAME + " <colAa,colB,colC> <table.name> 
<output.dir>\n\n" +
+        "Exports the given table and columns into the specified output 
path.\n" +
+        "The column names of the Kudu table must be specified in the form of 
\n" +
+        "comma-separated column names.\n" +
+        "Other options that may be specified with -D include:\n" +
+        "'-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of 
tabs\n" +
+        "-D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job 
name for the" +
+        "export.\n" + CommandLineParser.getHelpSnippet();
+    System.err.println(usage);
+  }
+
+  @Override
+  public int run(String[] otherArgs) throws Exception {
+    if (otherArgs.length < 3) {
+      usage("Wrong number of arguments: " + otherArgs.length);
+      return -1;
+    }
+    Job job = createSubmittableJob(getConf(), otherArgs);
+    return job.waitForCompletion(true) ? 0 : 1;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int status = ToolRunner.run(new ExportCsv(), args);
+    System.exit(status);
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsvMapper.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsvMapper.java
 
b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsvMapper.java
new file mode 100644
index 0000000..bbe855c
--- /dev/null
+++ 
b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ExportCsvMapper.java
@@ -0,0 +1,114 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.kudu.mapreduce.tools;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.Bytes;
+import org.apache.kudu.client.RowResult;
+
+/**
+ * Mapper that ingests Kudu rows and turns them into CSV lines.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ExportCsvMapper extends Mapper<NullWritable, RowResult, 
NullWritable,Text> {
+
+  private static final NullWritable NULL_KEY = NullWritable.get();
+
+  /** Column seperator */
+  private String separator;
+
+  private Schema schema;
+
+  /**
+   * Handles initializing this class with objects specific to it.
+   */
+  @Override
+  protected void setup(Context context) {
+    Configuration conf = context.getConfiguration();
+    this.separator = conf.get(ExportCsv.SEPARATOR_CONF_KEY, 
ExportCsv.DEFAULT_SEPARATOR);
+  }
+
+  /**
+   * Converts Kudu RowResult into a line of CSV text.
+   */
+  @Override
+  public void map(NullWritable key, RowResult value, Context context) throws 
IOException {
+    this.schema = value.getSchema();
+    try {
+      context.write(NULL_KEY, new Text(rowResultToString(value)));
+    } catch (InterruptedException e) {
+      throw new IOException("Failing task since it was interrupted", e);
+    }
+  }
+
+  private String rowResultToString(RowResult value) {
+    StringBuilder buf = new StringBuilder();
+    for (int i = 0; i < schema.getColumnCount(); i++) {
+      ColumnSchema col = schema.getColumnByIndex(i);
+      if (i != 0) {
+        buf.append(this.separator);
+      }
+
+      switch (col.getType()) {
+        case INT8:
+          buf.append(value.getByte(i));
+          break;
+        case INT16:
+          buf.append(value.getShort(i));
+          break;
+        case INT32:
+          buf.append(value.getInt(i));
+          break;
+        case INT64:
+          buf.append(value.getLong(i));
+          break;
+        case STRING:
+          buf.append(value.getString(i));
+          break;
+        case BINARY:
+          buf.append(Bytes.pretty(value.getBinaryCopy(i)));
+          break;
+        case FLOAT:
+          buf.append(value.getFloat(i));
+          break;
+        case DOUBLE:
+          buf.append(value.getDouble(i));
+          break;
+        case BOOL:
+          buf.append(value.getBoolean(i));
+          break;
+        case UNIXTIME_MICROS:
+          buf.append(value.getLong(i));
+          break;
+        default:
+          buf.append("<unknown type!>");
+          break;
+      }
+    }
+    return buf.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquet.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquet.java
 
b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquet.java
new file mode 100644
index 0000000..eff500d
--- /dev/null
+++ 
b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquet.java
@@ -0,0 +1,180 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.kudu.mapreduce.tools;
+
+import static java.sql.Types.TIMESTAMP;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.hadoop.Footer;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.mapreduce.CommandLineParser;
+import org.apache.kudu.mapreduce.KuduTableMapReduceUtil;
+
+/**
+ * Map-only job that reads Apache Parquet files and inserts them into a single 
Kudu table.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class ImportParquet extends Configured implements Tool {
+
+  static final String NAME = "importparquet";
+  static final String JOB_NAME_CONF_KEY = "importparquet.job.name";
+  static final String PARQUET_INPUT_SCHEMA = "importparquet.input.schema";
+
+  /**
+   * Sets up the actual job.
+   *
+   * @param conf the current configuration
+   * @param args the command line parameters
+   * @return the newly created job
+   * @throws java.io.IOException when setting up the job fails
+   */
+  @SuppressWarnings("deprecation")
+  public static Job createSubmittableJob(Configuration conf, String[] args)
+      throws IOException, ClassNotFoundException {
+
+    final String tableName = args[0];
+    Path inputDir = new Path(args[1]);
+
+    List<Footer> footers = new ArrayList<Footer>();
+    footers.addAll(ParquetFileReader.readFooters(conf, inputDir));
+
+    MessageType schema = 
footers.get(0).getParquetMetadata().getFileMetaData().getSchema();
+    GroupWriteSupport.setSchema(schema, conf);
+    conf.set(PARQUET_INPUT_SCHEMA, schema.toString());
+
+    String jobName = conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName);
+    Job job = new Job(conf,jobName);
+    job.setJarByClass(ImportParquet.class);
+    job.setMapperClass(ImportParquetMapper.class);
+    job.setNumReduceTasks(0);
+    job.setInputFormatClass(ParquetInputFormat.class);
+    ParquetInputFormat.setReadSupportClass(job, ParquetReadSupport.class);
+    ParquetInputFormat.setInputPaths(job, inputDir);
+
+    CommandLineParser cmdLineParser = new CommandLineParser(conf);
+    KuduClient client = cmdLineParser.getClient();
+    KuduTable table = client.openTable(tableName);
+
+
+    // Pre-flight checks of input parquet schema and table schema.
+    for (ColumnSchema columnSchema : table.getSchema().getColumns()) {
+      if (schema.containsField(columnSchema.getName())) {
+        if 
(!schema.getType(columnSchema.getName()).asPrimitiveType().getPrimitiveTypeName()
+            .equals(getTypeName(columnSchema.getType()))) {
+          throw new IllegalArgumentException("The column type " +
+              getTypeName(columnSchema.getType()) + " does not exist in 
Parquet schema");
+        }
+      } else {
+        throw new IllegalArgumentException("The column " + 
columnSchema.getName() +
+            " does not exist in Parquet schema");
+      }
+    }
+    // Kudu doesn't support Parquet's TIMESTAMP.
+    Iterator<ColumnDescriptor> fields = schema.getColumns().iterator();
+    while (fields.hasNext()) {
+      if (fields.next().getType().equals(TIMESTAMP)) {
+        throw new IllegalArgumentException("This " + fields.next().getType() +
+          " Parquet type is not supported in Kudu");
+      }
+    }
+
+    FileInputFormat.setInputPaths(job, inputDir);
+    new 
KuduTableMapReduceUtil.TableOutputFormatConfiguratorWithCommandLineParser(
+        job,
+        tableName)
+        .configure();
+    return job;
+  }
+
+  private static PrimitiveType.PrimitiveTypeName getTypeName(Type type) {
+    switch (type) {
+      case BOOL:
+        return PrimitiveType.PrimitiveTypeName.BOOLEAN;
+      case INT8:
+        return PrimitiveType.PrimitiveTypeName.INT32;
+      case INT16:
+        return PrimitiveType.PrimitiveTypeName.INT64;
+      case INT32:
+        return PrimitiveType.PrimitiveTypeName.INT32;
+      case INT64:
+        return PrimitiveType.PrimitiveTypeName.INT64;
+      case STRING:
+        return PrimitiveType.PrimitiveTypeName.BINARY;
+      case FLOAT:
+        return PrimitiveType.PrimitiveTypeName.FLOAT;
+      case DOUBLE:
+        return PrimitiveType.PrimitiveTypeName.DOUBLE;
+      default:
+        throw new IllegalArgumentException("Type " + type.getName() + " not 
recognized");
+    }
+  }
+
+  /*
+   * @param errorMsg error message. can be null
+   */
+  private static void usage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+    }
+    String usage =
+        "Usage: " + NAME + "<table.name> <input.dir>\n\n" +
+            "Imports the given input directory of Apache Parquet data into the 
specified table.\n" +
+            "Other options that may be specified with -D include:\n" +
+            "-D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce 
job name for the" +
+            "import.\n" + CommandLineParser.getHelpSnippet();
+
+    System.err.println(usage);
+  }
+
+  @Override
+  public int run(String[] otherArgs) throws Exception {
+    if (otherArgs.length < 1) {
+      usage("Wrong number of arguments: " + otherArgs.length);
+      return -1;
+    }
+    Job job = createSubmittableJob(getConf(), otherArgs);
+    return job.waitForCompletion(true) ? 0 : 1;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int status = ToolRunner.run(new ImportParquet(), args);
+    System.exit(status);
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquetMapper.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquetMapper.java
 
b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquetMapper.java
new file mode 100644
index 0000000..bf40442
--- /dev/null
+++ 
b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ImportParquetMapper.java
@@ -0,0 +1,113 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.kudu.mapreduce.tools;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.mapreduce.KuduTableMapReduceUtil;
+
+/**
+ * Mapper that ingests Apache Parquet lines and turns them into Kudu Inserts.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ImportParquetMapper extends Mapper<LongWritable, Group, 
NullWritable, Operation> {
+
+  private static final NullWritable NULL_KEY = NullWritable.get();
+
+  private MessageType parquetSchema;
+
+  private KuduTable table;
+  private Schema schema;
+
+  /**
+   * Handles initializing this class with objects specific to it (i.e., the 
parser).
+   */
+  @Override
+  protected void setup(Context context) {
+    Configuration conf = context.getConfiguration();
+    parquetSchema = MessageTypeParser.parseMessageType(conf.get(
+      ImportParquet.PARQUET_INPUT_SCHEMA));
+
+    this.table = KuduTableMapReduceUtil.getTableFromContext(context);
+    this.schema = this.table.getSchema();
+  }
+
+  /**
+   * Convert a line of Parquet data into a Kudu Insert
+   */
+  @Override
+  public void map(LongWritable key, Group value, Context context)
+      throws IOException {
+
+    try {
+      Insert insert = this.table.newInsert();
+      PartialRow row = insert.getRow();
+      for (int i = 0; i < parquetSchema.getFields().size(); i++) {
+        String colName = parquetSchema.getFields().get(i).getName();
+        ColumnSchema col = this.schema.getColumn(colName);
+        String colValue = value.getValueToString(i, 0);
+        switch (col.getType()) {
+          case BOOL:
+            row.addBoolean(colName, Boolean.parseBoolean(colValue));
+            break;
+          case INT8:
+            row.addByte(colName, Byte.parseByte(colValue));
+            break;
+          case INT16:
+            row.addShort(colName, Short.parseShort(colValue));
+            break;
+          case INT32:
+            row.addInt(colName, Integer.parseInt(colValue));
+            break;
+          case INT64:
+            row.addLong(colName, Long.parseLong(colValue));
+            break;
+          case STRING:
+            row.addString(colName, colValue);
+            break;
+          case FLOAT:
+            row.addFloat(colName, Float.parseFloat(colValue));
+            break;
+          case DOUBLE:
+            row.addDouble(colName, Double.parseDouble(colValue));
+            break;
+          default:
+            throw new IllegalArgumentException("Type " + col.getType() + " not 
recognized");
+        }
+      }
+      context.write(NULL_KEY, insert);
+    } catch (InterruptedException e) {
+      throw new IOException("Failing task since it was interrupted", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ParquetReadSupport.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ParquetReadSupport.java
 
b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ParquetReadSupport.java
new file mode 100644
index 0000000..6762f99
--- /dev/null
+++ 
b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/ParquetReadSupport.java
@@ -0,0 +1,36 @@
+/**
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. See accompanying LICENSE file.
+ */
+
+package org.apache.kudu.mapreduce.tools;
+
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.api.DelegatingReadSupport;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+
+/**
+ * Read support for Apache Parquet.
+ */
+public final class ParquetReadSupport extends DelegatingReadSupport<Group> {
+
+  public ParquetReadSupport() {
+    super(new GroupReadSupport());
+  }
+
+  @Override
+  public org.apache.parquet.hadoop.api.ReadSupport.ReadContext 
init(InitContext context) {
+    return super.init(context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITExportCsv.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITExportCsv.java
 
b/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITExportCsv.java
new file mode 100644
index 0000000..984ec63
--- /dev/null
+++ 
b/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITExportCsv.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.mapreduce.tools;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.BaseKuduTest;
+import org.apache.kudu.mapreduce.CommandLineParser;
+import org.apache.kudu.mapreduce.HadoopTestingUtility;
+
+public class ITExportCsv extends BaseKuduTest {
+
+  private static final String TABLE_NAME =
+    ITExportCsv.class.getName() + "-" + System.currentTimeMillis();
+
+  private static final HadoopTestingUtility HADOOP_UTIL = new 
HadoopTestingUtility();
+
+  private static Schema schema;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    BaseKuduTest.setUpBeforeClass();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    try {
+      BaseKuduTest.tearDownAfterClass();
+    } finally {
+      HADOOP_UTIL.cleanup();
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    Configuration conf = new Configuration();
+    String testHome =
+      HADOOP_UTIL.setupAndGetTestDir(ITExportCsv.class.getName(), 
conf).getAbsolutePath();
+
+    // create a table with on empty tablet and 3 tablets of 3 rows each.
+    createFourTabletsTableWithNineRows(TABLE_NAME);
+    String[] args = new String[] {
+      "-D" + CommandLineParser.MASTER_ADDRESSES_KEY + "=" + 
getMasterAddresses(),
+      "*", TABLE_NAME, testHome + "/exportdata"};
+
+    GenericOptionsParser parser = new GenericOptionsParser(conf, args);
+    Job job = ExportCsv.createSubmittableJob(parser.getConfiguration(), 
parser.getRemainingArgs());
+    assertTrue("Test job did not end properly", job.waitForCompletion(true));
+
+    String csvContent = readCsvFile(new File(testHome + 
"/exportdata/part-m-00001"));
+    assertEquals(csvContent.split("\n").length,3);
+    assertEquals(csvContent.split("\n")[0].split("\t")[3],"a string");
+  }
+
+  private String readCsvFile(File data) throws IOException {
+    FileInputStream fos = new FileInputStream(data);
+    return IOUtils.toString(fos, "UTF-8");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquet.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquet.java
 
b/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquet.java
new file mode 100644
index 0000000..0761a75
--- /dev/null
+++ 
b/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquet.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.mapreduce.tools;
+
+import static 
org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.kudu.client.KuduTable;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.schema.MessageType;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.BaseKuduTest;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.mapreduce.CommandLineParser;
+import org.apache.kudu.mapreduce.HadoopTestingUtility;
+
+public class ITImportParquet extends BaseKuduTest {
+
+  private static final String TABLE_NAME =
+    ITImportParquet.class.getName() + "-" + System.currentTimeMillis();
+
+  private static final HadoopTestingUtility HADOOP_UTIL = new 
HadoopTestingUtility();
+
+  private static Schema schema;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    BaseKuduTest.setUpBeforeClass();
+
+    ArrayList<ColumnSchema> columns = new ArrayList<ColumnSchema>(4);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32)
+      .key(true)
+      .build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("column1_i", Type.INT32)
+      .build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("column2_d", Type.DOUBLE)
+      .build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("column3_s", Type.STRING)
+      .nullable(true)
+      .build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("column4_b", Type.BOOL)
+      .build());
+    schema = new Schema(columns);
+
+    createTable(TABLE_NAME, schema,
+      new 
CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key")));
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    try {
+      BaseKuduTest.tearDownAfterClass();
+    } finally {
+      HADOOP_UTIL.cleanup();
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    Configuration conf = new Configuration();
+    String testHome =
+      HADOOP_UTIL.setupAndGetTestDir(ITImportCsv.class.getName(), 
conf).getAbsolutePath();
+
+    // Create a 4 records parquet input file.
+    Path data = new Path(testHome, "data.parquet");
+    writeParquetFile(data,conf);
+
+    StringBuilder sb = new StringBuilder();
+    for (ColumnSchema col : schema.getColumns()) {
+      sb.append(col.getName());
+      sb.append(",");
+    }
+    sb.deleteCharAt(sb.length() - 1);
+    String[] args = new String[] { "-D" + 
CommandLineParser.MASTER_ADDRESSES_KEY + "=" + getMasterAddresses(),
+      TABLE_NAME, data.toString()};
+
+    GenericOptionsParser parser = new GenericOptionsParser(conf, args);
+    Job job = ImportParquet.createSubmittableJob(parser.getConfiguration(), 
parser.getRemainingArgs());
+    assertTrue("Test job did not end properly", job.waitForCompletion(true));
+
+    KuduTable openTable = openTable(TABLE_NAME);
+    assertEquals(4, countRowsInScan(
+      client.newScannerBuilder(openTable).build()));
+    assertEquals("INT32 key=1, INT32 column1_i=3, DOUBLE column2_d=2.3, STRING 
column3_s=some string, " +
+      "BOOL column4_b=true",scanTableToStrings(openTable).get(0));
+  }
+
+  private void writeParquetFile(Path data,Configuration conf) throws 
IOException {
+    MessageType schema = parseMessageType(
+      "message test { "
+        + "required int32 key; "
+        + "required int32 column1_i; "
+        + "required double column2_d; "
+        + "required binary column3_s; "
+        + "required boolean column4_b; "
+        + "} ");
+    GroupWriteSupport.setSchema(schema, conf);
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+    ParquetWriter<Group> writer = new ParquetWriter<Group>(data, new 
GroupWriteSupport(),
+      UNCOMPRESSED, 1024, 1024, 512, true, false, 
ParquetProperties.WriterVersion.PARQUET_1_0, conf);
+
+    writer.write(f.newGroup().append("key", 1).append("column1_i", 
3).append("column2_d", 2.3)
+        .append("column3_s", "some string").append("column4_b", true));
+    writer.write(f.newGroup().append("key", 2).append("column1_i", 
5).append("column2_d", 4.5)
+        .append("column3_s", "some more").append("column4_b", false));
+    writer.write(f.newGroup().append("key", 3).append("column1_i", 
7).append("column2_d", 5.6)
+        .append("column3_s", "some more and more").append("column4_b", true));
+    writer.write(f.newGroup().append("key", 4).append("column1_i", 
9).append("column2_d",10.9)
+        .append("column3_s", "some more and alst").append("column4_b", false));
+    writer.close();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquetPreCheck.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquetPreCheck.java
 
b/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquetPreCheck.java
new file mode 100644
index 0000000..ab332ed
--- /dev/null
+++ 
b/java/kudu-client-tools/src/test/java/org/apache/kudu/mapreduce/tools/ITImportParquetPreCheck.java
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.mapreduce.tools;
+
+import static 
org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.kudu.client.KuduTable;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.schema.MessageType;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.BaseKuduTest;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.mapreduce.CommandLineParser;
+import org.apache.kudu.mapreduce.HadoopTestingUtility;
+
+public class ITImportParquetPreCheck extends BaseKuduTest {
+
+  private static final String TABLE_NAME =
+    ITImportParquet.class.getName() + "-" + System.currentTimeMillis();
+
+  private static final HadoopTestingUtility HADOOP_UTIL = new 
HadoopTestingUtility();
+
+  private static Schema schema;
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    BaseKuduTest.setUpBeforeClass();
+
+    ArrayList<ColumnSchema> columns = new ArrayList<ColumnSchema>(4);
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32)
+      .key(true)
+      .build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("column1_i", Type.INT32)
+      .build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("column2_d", Type.DOUBLE)
+      .build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("column3_s", Type.STRING)
+      .nullable(true)
+      .build());
+    columns.add(new ColumnSchema.ColumnSchemaBuilder("column4_b", Type.BOOL)
+      .build());
+    schema = new Schema(columns);
+
+    createTable(TABLE_NAME, schema,
+      new 
CreateTableOptions().setRangePartitionColumns(ImmutableList.of("key")));
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    try {
+      BaseKuduTest.tearDownAfterClass();
+    } finally {
+      HADOOP_UTIL.cleanup();
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    Configuration conf = new Configuration();
+    String testHome =
+      HADOOP_UTIL.setupAndGetTestDir(ITImportCsv.class.getName(), 
conf).getAbsolutePath();
+
+    // Create a 4 records parquet input file.
+    Path data = new Path(testHome, "data.parquet");
+    writeParquetFile(data,conf);
+
+    StringBuilder sb = new StringBuilder();
+    for (ColumnSchema col : schema.getColumns()) {
+      sb.append(col.getName());
+      sb.append(",");
+    }
+    sb.deleteCharAt(sb.length() - 1);
+    String[] args = new String[] { "-D" + 
CommandLineParser.MASTER_ADDRESSES_KEY + "=" +
+      getMasterAddresses(), TABLE_NAME, data.toString()};
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("The column column1_i does not exist in Parquet 
schema");
+
+    GenericOptionsParser parser = new GenericOptionsParser(conf, args);
+    Job job = ImportParquet.createSubmittableJob(parser.getConfiguration(), 
parser.getRemainingArgs());
+    job.waitForCompletion(true);
+
+    KuduTable openTable = openTable(TABLE_NAME);
+    assertEquals(0, 
countRowsInScan(client.newScannerBuilder(openTable).build()));
+  }
+
+  private void writeParquetFile(Path data,Configuration conf) throws 
IOException {
+    MessageType schema = parseMessageType(
+      "message test { "
+        + "required int32 key; "
+        + "required int32 column1_i_s; "
+        + "required binary column2_d; "
+        + "required binary column3_s; "
+        + "required boolean column4_b; "
+        + "} ");
+    GroupWriteSupport.setSchema(schema, conf);
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+    ParquetWriter<Group> writer = new ParquetWriter<Group>(data, new 
GroupWriteSupport(),
+      UNCOMPRESSED, 1024, 1024, 512, true, false, 
ParquetProperties.WriterVersion.PARQUET_1_0, conf);
+
+    writer.write(f.newGroup().append("key", 1).append("column1_i_s", 
292).append("column2_d", "no type")
+      .append("column3_s", "some string").append("column4_b", true));
+    writer.write(f.newGroup().append("key", 2).append("column1_i_s", 
23).append("column2_d", "no type")
+      .append("column3_s", "some more").append("column4_b", false));
+    writer.write(f.newGroup().append("key", 3).append("column1_i_s", 
32).append("column2_d", "no type")
+      .append("column3_s", "some more and more").append("column4_b", true));
+    writer.write(f.newGroup().append("key", 4).append("column1_i_s", 
22).append("column2_d", "no type")
+      .append("column3_s", "some more and alst").append("column4_b", false));
+    writer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-spark-tools/pom.xml
----------------------------------------------------------------------
diff --git a/java/kudu-spark-tools/pom.xml b/java/kudu-spark-tools/pom.xml
index c2eb57f..98ffe28 100644
--- a/java/kudu-spark-tools/pom.xml
+++ b/java/kudu-spark-tools/pom.xml
@@ -18,7 +18,8 @@
 // specific language governing permissions and limitations
 // under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.kudu</groupId>
@@ -98,6 +99,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>com.databricks</groupId>
+            <artifactId>spark-avro_2.10</artifactId>
+            <version>${sparkavro.version}</version>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <version>${junit.version}</version>

http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala
----------------------------------------------------------------------
diff --git 
a/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala
 
b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala
new file mode 100644
index 0000000..bc2f0a3
--- /dev/null
+++ 
b/java/kudu-spark-tools/src/main/scala/org/apache/kudu/spark/tools/ImportExportFiles.scala
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.spark.tools
+
+import org.apache.kudu.client.KuduClient
+import org.apache.kudu.spark.tools.ImportExportKudu.ArgsCls
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext}
+import org.slf4j.{Logger, LoggerFactory}
+import org.apache.kudu.spark.kudu._
+import com.databricks.spark.avro
+import com.google.common.annotations.VisibleForTesting
+
+object ImportExportKudu {
+  val LOG: Logger = LoggerFactory.getLogger(ImportExportKudu.getClass)
+
+  def fail(msg: String): Nothing = {
+    System.err.println(msg)
+    sys.exit(1)
+  }
+
+  def usage: String =
+    s"""
+       | Usage: --operation=import/export 
--format=<data-format(csv,parquet,avro)> --master-addrs=<master-addrs> 
--path=<path> --table-name=<table-name>
+       |    where
+       |      operation: import or export data from or to Kudu tables, 
default: import
+       |      format: specify the format of data want to import/export, the 
following formats are supported csv,parquet,avro default:csv
+       |      masterAddrs: comma separated addresses of Kudu master nodes, 
default: localhost
+       |      path: path to input or output for import/export operation, 
default: file://
+       |      tableName: table name to import/export, default: ""
+       |      columns: columns name for select statement on export from kudu 
table, default: *
+       |      delimiter: delimiter for csv import/export, default: ,
+       |      header: header for csv import/export, default:false
+     """.stripMargin
+
+  case class ArgsCls(operation: String = "import",
+                     format: String = "csv",
+                     masterAddrs: String = "localhost",
+                     path: String = "file://",
+                     tableName: String = "",
+                     columns: String = "*",
+                     delimiter: String = ",",
+                     header: String = "false",
+                     inferschema: String="false"
+                    )
+
+  object ArgsCls {
+    private def parseInner(options: ArgsCls, args: List[String]): ArgsCls = {
+      LOG.info(args.mkString(","))
+      args match {
+        case Nil => options
+        case "--help" :: _ =>
+          System.err.println(usage)
+          sys.exit(0)
+        case flag :: Nil => fail(s"flag $flag has no value\n$usage")
+        case flag :: value :: tail =>
+          val newOptions: ArgsCls = flag match {
+            case "--operation" => options.copy(operation = value)
+            case "--format" => options.copy(format = value)
+            case "--master-addrs" => options.copy(masterAddrs = value)
+            case "--path" => options.copy(path = value)
+            case "--table-name" => options.copy(tableName = value)
+            case "--columns" => options.copy(columns = value)
+            case "--delimiter" => options.copy(delimiter = value)
+            case "--header" => options.copy(header = value)
+            case "--inferschema" => options.copy(inferschema = value)
+            case _ => fail(s"unknown argument given $flag")
+          }
+          parseInner(newOptions, tail)
+      }
+    }
+
+    def parse(args: Array[String]): ArgsCls = {
+      parseInner(ArgsCls(), args.flatMap(_.split('=')).toList)
+    }
+  }
+}
+
+object ImportExportFiles {
+
+  import ImportExportKudu.{LOG, fail}
+
+  var sqlContext: SQLContext = _
+  var kuduOptions: Map[String, String] = _
+
+  def run(args: ArgsCls, sc: SparkContext, sqlContext: SQLContext): Unit = {
+    val kc = new KuduContext(args.masterAddrs, sc)
+    val applicationId = sc.applicationId
+
+    val client: KuduClient = kc.syncClient
+    if (!client.tableExists(args.tableName)) {
+      fail(args.tableName + s" table doesn't exist")
+    }
+
+    kuduOptions = Map(
+      "kudu.table" -> args.tableName,
+      "kudu.master" -> args.masterAddrs)
+
+    args.operation match {
+      case "import" =>
+        args.format match {
+          case "csv" =>
+            val df = sqlContext.read.option("header", 
args.header).option("delimiter", args.delimiter).csv(args.path)
+            kc.upsertRows(df, args.tableName)
+          case "parquet" =>
+            val df = sqlContext.read.parquet(args.path)
+            kc.upsertRows(df, args.tableName)
+          case "avro" =>
+            val df = 
sqlContext.read.format("com.databricks.spark.avro").load(args.path)
+            kc.upsertRows(df, args.tableName)
+          case _ => fail(args.format + s"unknown argument given ")
+        }
+      case "export" =>
+        val df = 
sqlContext.read.options(kuduOptions).kudu.select(args.columns);
+        args.format match {
+          case "csv" =>
+            df.write.format("com.databricks.spark.csv").option("header", 
args.header).option("delimiter",
+              args.delimiter).save(args.path)
+          case "parquet" =>
+            df.write.parquet(args.path)
+          case "avro" =>
+            df.write.format("com.databricks.spark.avro").save(args.path)
+          case _ => fail(args.format + s"unknown argument given  ")
+        }
+      case _ => fail(args.operation + s"unknown argument given ")
+    }
+  }
+  /**
+    * Entry point for testing. SparkContext is a singleton,
+    * so tests must create and manage their own.
+    */
+  @VisibleForTesting
+  def testMain(args: Array[String], sc: SparkContext): Unit = {
+    sqlContext = new SQLContext(sc)
+    run(ArgsCls.parse(args), sc, sqlContext)
+  }
+
+  def main(args: Array[String]): Unit = {
+    val conf = new SparkConf().setAppName("Import or Export CSV files from/to 
Kudu ")
+    val sc = new SparkContext(conf)
+    testMain(args, sc)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala
----------------------------------------------------------------------
diff --git 
a/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala
 
b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala
new file mode 100644
index 0000000..2507853
--- /dev/null
+++ 
b/java/kudu-spark-tools/src/test/scala/org/apache/kudu/spark/tools/TestImportExportFiles.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kudu.spark.tools
+
+import java.io.{File, FileOutputStream}
+
+import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder
+import org.apache.kudu.{Schema, Type}
+import org.apache.kudu.client.CreateTableOptions
+import org.apache.kudu.spark.kudu._
+import org.apache.spark.sql.SQLContext
+import org.junit.Assert._
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FunSuite, Matchers}
+import org.spark_project.guava.collect.ImmutableList
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[JUnitRunner])
+class TestImportExportFiles  extends FunSuite with TestContext with  Matchers {
+
+  private val TABLE_NAME: String = classOf[TestImportExportFiles].getName + 
"-" + System.currentTimeMillis
+  var sqlContext : SQLContext = _
+  var kuduOptions : Map[String, String] = _
+
+  test("Spark Import Export") {
+    val schema: Schema = {
+      val columns = ImmutableList.of(
+        new ColumnSchemaBuilder("key", Type.STRING).key(true).build(),
+        new ColumnSchemaBuilder("column1_i", Type.STRING).build(),
+        new ColumnSchemaBuilder("column2_d", 
Type.STRING).nullable(true).build(),
+        new ColumnSchemaBuilder("column3_s", Type.STRING).build(),
+        new ColumnSchemaBuilder("column4_b", Type.STRING).build())
+      new Schema(columns)
+    }
+    val tableOptions = new 
CreateTableOptions().setRangePartitionColumns(List("key").asJava)
+      .setNumReplicas(1)
+    kuduClient.createTable(TABLE_NAME, schema, tableOptions)
+
+    val data: File = new File("target/", TABLE_NAME+".csv")
+    writeCsvFile(data)
+
+    ImportExportFiles.testMain(Array("--operation=import",
+      "--format=csv",
+      s"--master-addrs=${miniCluster.getMasterAddresses}",
+      s"--path=${"target/"+TABLE_NAME+".csv"}",
+      s"--table-name=${TABLE_NAME}",
+      "--delimiter=,",
+      "--header=true",
+      "--inferschema=true"), sc)
+    val rdd = kuduContext.kuduRDD(sc, TABLE_NAME, List("key"))
+    assert(rdd.collect.length == 4)
+    assertEquals(rdd.collect().mkString(","),"[1],[2],[3],[4]")
+  }
+
+  def writeCsvFile(data: File)
+  {
+    val fos: FileOutputStream = new FileOutputStream(data)
+    fos.write("key,column1_i,column2_d,column3_s,column4_b\n".getBytes)
+    fos.write("1,3,2.3,some string,true\n".getBytes)
+    fos.write("2,5,4.5,some more,false\n".getBytes)
+    fos.write("3,7,1.2,wait this is not a double bad row,true\n".getBytes)
+    fos.write("4,9,10.1,trailing separator isn't bad mkay?,true\n".getBytes)
+    fos.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/5d53a3b7/java/pom.xml
----------------------------------------------------------------------
diff --git a/java/pom.xml b/java/pom.xml
index 5893735..fed039a 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -29,10 +29,10 @@
 
     <!-- inherit from the ASF POM for distribution management -->
     <parent>
-      <groupId>org.apache</groupId>
-      <artifactId>apache</artifactId>
-      <version>18</version>
-      <relativePath/>
+        <groupId>org.apache</groupId>
+        <artifactId>apache</artifactId>
+        <version>18</version>
+        <relativePath/>
     </parent>
 
     <name>Kudu</name>
@@ -72,6 +72,8 @@
         <protobuf.version>3.3.0</protobuf.version>
         <slf4j.version>1.7.25</slf4j.version>
         <yetus.version>0.4.0</yetus.version>
+        <parquet.version>1.9.0</parquet.version>
+        <sparkavro.version>3.2.0</sparkavro.version>
 
         <!-- Scala Library dependencies -->
         <spark1.version>1.6.3</spark1.version>
@@ -83,8 +85,9 @@
         <!-- Misc variables -->
         <testdata.dir>target/testdata</testdata.dir>
         <testArgLine>-enableassertions -Xmx1900m
-        -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true
-        -Djava.awt.headless=true</testArgLine>
+            -Djava.security.egd=file:/dev/./urandom 
-Djava.net.preferIPv4Stack=true
+            -Djava.awt.headless=true
+        </testArgLine>
     </properties>
 
     <modules>

Reply via email to