Repository: sqoop
Updated Branches:
  refs/heads/trunk 7e8a6fd89 -> 666700d33


SQOOP-1394: Export data from HDFS back to an RDMS

(Qian Xu via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/666700d3
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/666700d3
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/666700d3

Branch: refs/heads/trunk
Commit: 666700d33fbe064b74d239188fee5f8f3a96a328
Parents: 7e8a6fd
Author: Jarek Jarcec Cecho <[email protected]>
Authored: Tue Sep 9 15:30:40 2014 +0200
Committer: Jarek Jarcec Cecho <[email protected]>
Committed: Tue Sep 9 15:30:40 2014 +0200

----------------------------------------------------------------------
 .../apache/sqoop/mapreduce/ExportJobBase.java   |   5 +-
 .../apache/sqoop/mapreduce/JdbcExportJob.java   |  73 ++--
 .../sqoop/mapreduce/ParquetExportMapper.java    |  43 ++
 .../com/cloudera/sqoop/TestParquetExport.java   | 389 +++++++++++++++++++
 4 files changed, 482 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/666700d3/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java 
b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
index 54c27ee..100c73c 100644
--- a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
@@ -58,7 +58,7 @@ public class ExportJobBase extends JobBase {
    * The (inferred) type of a file or group of files.
    */
   public enum FileType {
-    SEQUENCE_FILE, AVRO_DATA_FILE, HCATALOG_MANAGED_FILE, UNKNOWN
+    SEQUENCE_FILE, AVRO_DATA_FILE, HCATALOG_MANAGED_FILE, PARQUET_FILE, UNKNOWN
   }
 
   public static final Log LOG = LogFactory.getLog(
@@ -190,6 +190,9 @@ public class ExportJobBase extends JobBase {
     if (header[0] == 'O' && header[1] == 'b' && header[2] == 'j') {
       return FileType.AVRO_DATA_FILE;
     }
+    if (header[0] == 'P' && header[1] == 'A' && header[2] == 'R') {
+      return FileType.PARQUET_FILE;
+    }
     return FileType.UNKNOWN;
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/666700d3/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java 
b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
index fee78e0..93d438a 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
@@ -18,10 +18,14 @@
 
 package org.apache.sqoop.mapreduce;
 
-import java.io.IOException;
-import java.util.Map;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.mapreduce.ExportJobBase;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.DefaultStringifier;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
@@ -30,11 +34,10 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
-import com.cloudera.sqoop.manager.ConnManager;
-import com.cloudera.sqoop.manager.ExportJobContext;
-import com.cloudera.sqoop.mapreduce.ExportJobBase;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
-import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
+import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
+
+import java.io.IOException;
+import java.util.Map;
 
 /**
  * Run an export using JDBC (JDBC-based ExportOutputFormat).
@@ -72,27 +75,37 @@ public class JdbcExportJob extends ExportJobBase {
       return;
     } else if (fileType == FileType.AVRO_DATA_FILE) {
       LOG.debug("Configuring for Avro export");
-      ConnManager connManager = context.getConnManager();
-      Map<String, Integer> columnTypeInts;
-      if (options.getCall() == null) {
-        columnTypeInts = connManager.getColumnTypes(
+      configureGenericRecordExportInputFormat(job, tableName);
+    } else if (fileType == FileType.PARQUET_FILE) {
+      LOG.debug("Configuring for Parquet export");
+      configureGenericRecordExportInputFormat(job, tableName);
+      FileSystem fs = FileSystem.get(job.getConfiguration());
+      String uri = "dataset:" + fs.makeQualified(getInputPath());
+      DatasetKeyInputFormat.configure(job).readFrom(uri);
+    }
+  }
+
+  private void configureGenericRecordExportInputFormat(Job job, String 
tableName)
+      throws IOException {
+    ConnManager connManager = context.getConnManager();
+    Map<String, Integer> columnTypeInts;
+    if (options.getCall() == null) {
+      columnTypeInts = connManager.getColumnTypes(
           tableName,
           options.getSqlQuery());
-      } else {
-        columnTypeInts = connManager.getColumnTypesForProcedure(
+    } else {
+      columnTypeInts = connManager.getColumnTypesForProcedure(
           options.getCall());
-      }
-      MapWritable columnTypes = new MapWritable();
-      for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) {
-        Text columnName = new Text(e.getKey());
-        Text columnText = new Text(
-            connManager.toJavaType(tableName, e.getKey(), e.getValue()));
-        columnTypes.put(columnName, columnText);
-      }
-      DefaultStringifier.store(job.getConfiguration(), columnTypes,
-          AvroExportMapper.AVRO_COLUMN_TYPES_MAP);
     }
-
+    MapWritable columnTypes = new MapWritable();
+    for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) {
+      Text columnName = new Text(e.getKey());
+      Text columnText = new Text(
+          connManager.toJavaType(tableName, e.getKey(), e.getValue()));
+      columnTypes.put(columnName, columnText);
+    }
+    DefaultStringifier.store(job.getConfiguration(), columnTypes,
+        AvroExportMapper.AVRO_COLUMN_TYPES_MAP);
   }
 
   @Override
@@ -101,10 +114,14 @@ public class JdbcExportJob extends ExportJobBase {
     if (isHCatJob) {
       return SqoopHCatUtilities.getInputFormatClass();
     }
-    if (fileType == FileType.AVRO_DATA_FILE) {
-      return AvroInputFormat.class;
+    switch (fileType) {
+      case AVRO_DATA_FILE:
+        return AvroInputFormat.class;
+      case PARQUET_FILE:
+        return DatasetKeyInputFormat.class;
+      default:
+        return super.getInputFormatClass();
     }
-    return super.getInputFormatClass();
   }
 
   @Override
@@ -117,6 +134,8 @@ public class JdbcExportJob extends ExportJobBase {
         return SequenceFileExportMapper.class;
       case AVRO_DATA_FILE:
         return AvroExportMapper.class;
+      case PARQUET_FILE:
+        return ParquetExportMapper.class;
       case UNKNOWN:
       default:
         return TextExportMapper.class;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/666700d3/src/java/org/apache/sqoop/mapreduce/ParquetExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetExportMapper.java 
b/src/java/org/apache/sqoop/mapreduce/ParquetExportMapper.java
new file mode 100644
index 0000000..2bc0cba
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/ParquetExportMapper.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.IOException;
+
+/**
+ * Exports Parquet records from a data source.
+ */
+public class ParquetExportMapper
+    extends GenericRecordExportMapper<GenericRecord, NullWritable> {
+
+  @Override
+  protected void setup(Context context) throws IOException, 
InterruptedException {
+    super.setup(context);
+  }
+
+  @Override
+  protected void map(GenericRecord key, NullWritable val,
+      Context context) throws IOException, InterruptedException {
+    context.write(toSqoopRecord(key), NullWritable.get());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/666700d3/src/test/com/cloudera/sqoop/TestParquetExport.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/TestParquetExport.java 
b/src/test/com/cloudera/sqoop/TestParquetExport.java
new file mode 100644
index 0000000..9065daf
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/TestParquetExport.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop;
+
+import com.cloudera.sqoop.testutil.ExportJobTestCase;
+import com.google.common.collect.Lists;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.kitesdk.data.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Test that we can export Parquet Data Files from HDFS into databases.
+ */
+public class TestParquetExport extends ExportJobTestCase {
+
+  /**
+   * @return an argv for the CodeGenTool to use when creating tables to export.
+   */
+  protected String [] getCodeGenArgv(String... extraArgs) {
+    List<String> codeGenArgv = new ArrayList<String>();
+
+    if (null != extraArgs) {
+      for (String arg : extraArgs) {
+        codeGenArgv.add(arg);
+      }
+    }
+
+    codeGenArgv.add("--table");
+    codeGenArgv.add(getTableName());
+    codeGenArgv.add("--connect");
+    codeGenArgv.add(getConnectString());
+
+    return codeGenArgv.toArray(new String[0]);
+  }
+
+  /** When generating data for export tests, each column is generated
+      according to a ColumnGenerator. Methods exist for determining
+      what to put into Parquet objects in the files to export, as well
+      as what the object representation of the column as returned by
+      the database should look like.
+    */
+  public interface ColumnGenerator {
+    /** For a row with id rowNum, what should we write into that
+        Parquet record to export?
+      */
+    Object getExportValue(int rowNum);
+
+    /** Return the Parquet schema for the field. */
+    Schema getColumnParquetSchema();
+
+    /** For a row with id rowNum, what should the database return
+        for the given column's value?
+      */
+    Object getVerifyValue(int rowNum);
+
+    /** Return the column type to put in the CREATE TABLE statement. */
+    String getColumnType();
+  }
+
+  private ColumnGenerator colGenerator(final Object exportValue,
+      final Schema schema, final Object verifyValue,
+      final String columnType) {
+    return new ColumnGenerator() {
+      @Override
+      public Object getVerifyValue(int rowNum) {
+        return verifyValue;
+      }
+      @Override
+      public Object getExportValue(int rowNum) {
+        return exportValue;
+      }
+      @Override
+      public String getColumnType() {
+        return columnType;
+      }
+      @Override
+      public Schema getColumnParquetSchema() {
+        return schema;
+      }
+    };
+  }
+
+  /**
+   * Create a data file that gets exported to the db.
+   * @param fileNum the number of the file (for multi-file export)
+   * @param numRecords how many records to write to the file.
+   */
+  protected void createParquetFile(int fileNum, int numRecords,
+      ColumnGenerator... extraCols) throws IOException {
+
+    String uri = "dataset:file:" + getTablePath();
+    Schema schema = buildSchema(extraCols);
+    DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
+      .schema(schema)
+      .format(Formats.PARQUET)
+      .build();
+    Dataset dataset = Datasets.create(uri, descriptor);
+    DatasetWriter writer = dataset.newWriter();
+    try {
+      for (int i = 0; i < numRecords; i++) {
+        GenericRecord record = new GenericData.Record(schema);
+        record.put("id", i);
+        record.put("msg", getMsgPrefix() + i);
+        addExtraColumns(record, i, extraCols);
+        writer.write(record);
+      }
+    } finally {
+      writer.close();
+    }
+  }
+
+  private Schema buildSchema(ColumnGenerator... extraCols) {
+    List<Field> fields = new ArrayList<Field>();
+    fields.add(buildField("id", Schema.Type.INT));
+    fields.add(buildField("msg", Schema.Type.STRING));
+    int colNum = 0;
+    for (ColumnGenerator gen : extraCols) {
+      if (gen.getColumnParquetSchema() != null) {
+        fields.add(buildParquetField(forIdx(colNum++),
+            gen.getColumnParquetSchema()));
+      }
+    }
+    Schema schema = Schema.createRecord("myschema", null, null, false);
+    schema.setFields(fields);
+    return schema;
+  }
+
+  private void addExtraColumns(GenericRecord record, int rowNum,
+      ColumnGenerator[] extraCols) {
+    int colNum = 0;
+    for (ColumnGenerator gen : extraCols) {
+      if (gen.getColumnParquetSchema() != null) {
+        record.put(forIdx(colNum++), gen.getExportValue(rowNum));
+      }
+    }
+  }
+
+  private Field buildField(String name, Schema.Type type) {
+    return new Field(name, Schema.create(type), null, null);
+  }
+
+  private Field buildParquetField(String name, Schema schema) {
+    return new Field(name, schema, null, null);
+  }
+
+  /** Return the column name for a column index.
+   *  Each table contains two columns named 'id' and 'msg', and then an
+   *  arbitrary number of additional columns defined by ColumnGenerators.
+   *  These columns are referenced by idx 0, 1, 2...
+   *  @param idx the index of the ColumnGenerator in the array passed to
+   *   createTable().
+   *  @return the name of the column
+   */
+  protected String forIdx(int idx) {
+    return "col" + idx;
+  }
+
+  /**
+   * Return a SQL statement that drops a table, if it exists.
+   * @param tableName the table to drop.
+   * @return the SQL statement to drop that table.
+   */
+  protected String getDropTableStatement(String tableName) {
+    return "DROP TABLE " + tableName + " IF EXISTS";
+  }
+
+  /** Create the table definition to export to, removing any prior table.
+      By specifying ColumnGenerator arguments, you can add extra columns
+      to the table of arbitrary type.
+   */
+  private void createTable(ColumnGenerator... extraColumns)
+      throws SQLException {
+    Connection conn = getConnection();
+    PreparedStatement statement = conn.prepareStatement(
+        getDropTableStatement(getTableName()),
+        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    try {
+      statement.executeUpdate();
+      conn.commit();
+    } finally {
+      statement.close();
+    }
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("CREATE TABLE ");
+    sb.append(getTableName());
+    sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)");
+    int colNum = 0;
+    for (ColumnGenerator gen : extraColumns) {
+      if (gen.getColumnType() != null) {
+        sb.append(", " + forIdx(colNum++) + " " + gen.getColumnType());
+      }
+    }
+    sb.append(")");
+
+    statement = conn.prepareStatement(sb.toString(),
+        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    try {
+      statement.executeUpdate();
+      conn.commit();
+    } finally {
+      statement.close();
+    }
+  }
+
+  /** Verify that on a given row, a column has a given value.
+   * @param id the id column specifying the row to test.
+   */
+  private void assertColValForRowId(int id, String colName, Object expectedVal)
+      throws SQLException {
+    Connection conn = getConnection();
+    LOG.info("Verifying column " + colName + " has value " + expectedVal);
+
+    PreparedStatement statement = conn.prepareStatement(
+        "SELECT " + colName + " FROM " + getTableName() + " WHERE id = " + id,
+        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    Object actualVal = null;
+    try {
+      ResultSet rs = statement.executeQuery();
+      try {
+        rs.next();
+        actualVal = rs.getObject(1);
+      } finally {
+        rs.close();
+      }
+    } finally {
+      statement.close();
+    }
+
+    if (expectedVal != null && expectedVal instanceof byte[]) {
+      assertArrayEquals((byte[]) expectedVal, (byte[]) actualVal);
+    } else {
+      assertEquals("Got unexpected column value", expectedVal, actualVal);
+    }
+  }
+
+  /** Verify that for the max and min values of the 'id' column, the values
+      for a given column meet the expected values.
+   */
+  protected void assertColMinAndMax(String colName, ColumnGenerator generator)
+      throws SQLException {
+    Connection conn = getConnection();
+    int minId = getMinRowId(conn);
+    int maxId = getMaxRowId(conn);
+
+    LOG.info("Checking min/max for column " + colName + " with type "
+        + generator.getColumnType());
+
+    Object expectedMin = generator.getVerifyValue(minId);
+    Object expectedMax = generator.getVerifyValue(maxId);
+
+    assertColValForRowId(minId, colName, expectedMin);
+    assertColValForRowId(maxId, colName, expectedMax);
+  }
+
+  public void testSupportedParquetTypes() throws IOException, SQLException {
+    String[] argv = {};
+    final int TOTAL_RECORDS = 1 * 10;
+
+    byte[] b = new byte[] { (byte) 1, (byte) 2 };
+    Schema fixed = Schema.createFixed("myfixed", null, null, 2);
+    Schema enumeration = Schema.createEnum("myenum", null, null,
+        Lists.newArrayList("a", "b"));
+
+    ColumnGenerator[] gens = new ColumnGenerator[] {
+      colGenerator(true, Schema.create(Schema.Type.BOOLEAN), true, "BIT"),
+      colGenerator(100, Schema.create(Schema.Type.INT), 100, "INTEGER"),
+      colGenerator(200L, Schema.create(Schema.Type.LONG), 200L, "BIGINT"),
+      // HSQLDB maps REAL to double, not float:
+      colGenerator(1.0f, Schema.create(Schema.Type.FLOAT), 1.0d, "REAL"),
+      colGenerator(2.0d, Schema.create(Schema.Type.DOUBLE), 2.0d, "DOUBLE"),
+      colGenerator("s", Schema.create(Schema.Type.STRING), "s", "VARCHAR(8)"),
+      colGenerator(ByteBuffer.wrap(b), Schema.create(Schema.Type.BYTES),
+          b, "VARBINARY(8)"),
+      colGenerator(new GenericData.Fixed(fixed, b), fixed,
+          b, "BINARY(2)"),
+      colGenerator(new GenericData.EnumSymbol(enumeration, "a"), enumeration,
+          "a", "VARCHAR(8)"),
+    };
+    createParquetFile(0, TOTAL_RECORDS, gens);
+    createTable(gens);
+    runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+    verifyExport(TOTAL_RECORDS);
+    for (int i = 0; i < gens.length; i++) {
+      assertColMinAndMax(forIdx(i), gens[i]);
+    }
+  }
+
+  public void testNullableField() throws IOException, SQLException {
+    String[] argv = {};
+    final int TOTAL_RECORDS = 1 * 10;
+
+    List<Schema> childSchemas = new ArrayList<Schema>();
+    childSchemas.add(Schema.create(Schema.Type.STRING));
+    childSchemas.add(Schema.create(Schema.Type.NULL));
+    Schema schema =  Schema.createUnion(childSchemas);
+    ColumnGenerator gen0 = colGenerator(null, schema, null, "VARCHAR(64)");
+    ColumnGenerator gen1 = colGenerator("s", schema, "s", "VARCHAR(64)");
+    createParquetFile(0, TOTAL_RECORDS, gen0, gen1);
+    createTable(gen0, gen1);
+    runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+    verifyExport(TOTAL_RECORDS);
+    assertColMinAndMax(forIdx(0), gen0);
+    assertColMinAndMax(forIdx(1), gen1);
+  }
+
+  public void testParquetRecordsNotSupported() throws IOException, 
SQLException {
+    String[] argv = {};
+    final int TOTAL_RECORDS = 1;
+
+    Schema schema =  Schema.createRecord("nestedrecord", null, null, false);
+    schema.setFields(Lists.newArrayList(buildField("myint",
+        Schema.Type.INT)));
+    GenericRecord record = new GenericData.Record(schema);
+    record.put("myint", 100);
+    // DB type is not used so can be anything:
+    ColumnGenerator gen = colGenerator(record, schema, null, "VARCHAR(64)");
+    createParquetFile(0, TOTAL_RECORDS,  gen);
+    createTable(gen);
+    try {
+      runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+      fail("Parquet records can not be exported.");
+    } catch (Exception e) {
+      // expected
+      assertTrue(true);
+    }
+  }
+
+  public void testMissingDatabaseFields() throws IOException, SQLException {
+    String[] argv = {};
+    final int TOTAL_RECORDS = 1;
+
+    // null column type means don't create a database column
+    // the Parquet value will not be exported
+    ColumnGenerator gen = colGenerator(100, Schema.create(Schema.Type.INT),
+        null, null);
+    createParquetFile(0, TOTAL_RECORDS, gen);
+    createTable(gen);
+    runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+    verifyExport(TOTAL_RECORDS);
+  }
+
+  public void testMissingParquetFields()  throws IOException, SQLException {
+    String[] argv = {};
+    final int TOTAL_RECORDS = 1;
+
+    // null Parquet schema means don't create an Parquet field
+    ColumnGenerator gen = colGenerator(null, null, null, "VARCHAR(64)");
+    createParquetFile(0, TOTAL_RECORDS, gen);
+    createTable(gen);
+    try {
+      runExport(getArgv(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+      fail("Missing Parquet field.");
+    } catch (Exception e) {
+      // expected
+      assertTrue(true);
+    }
+  }
+
+}

Reply via email to