Repository: sqoop Updated Branches: refs/heads/trunk 89366b49b -> df3a81b50
SQOOP-2295: Hive import with Parquet should append automatically (Qian Xu via Abraham Elmahrek) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/df3a81b5 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/df3a81b5 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/df3a81b5 Branch: refs/heads/trunk Commit: df3a81b50fe8d2b3c501706fd5d2937804cad170 Parents: 89366b4 Author: Abraham Elmahrek <[email protected]> Authored: Fri May 8 00:19:34 2015 -0700 Committer: Abraham Elmahrek <[email protected]> Committed: Fri May 8 00:19:34 2015 -0700 ---------------------------------------------------------------------- src/docs/man/hive-args.txt | 2 +- src/docs/man/sqoop-create-hive-table.txt | 2 +- src/docs/user/create-hive-table.txt | 2 +- src/docs/user/hive-args.txt | 2 +- .../sqoop/mapreduce/DataDrivenImportJob.java | 24 ++++- .../org/apache/sqoop/mapreduce/ParquetJob.java | 26 +++-- .../com/cloudera/sqoop/TestParquetImport.java | 4 +- .../com/cloudera/sqoop/hive/TestHiveImport.java | 104 +++++++++++++++++-- .../sqoop/testutil/BaseSqoopTestCase.java | 9 +- .../sqoop/testutil/ImportJobTestCase.java | 23 ++-- testdata/hive/scripts/normalImportAsParquet.q | 17 --- 11 files changed, 153 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/df3a81b5/src/docs/man/hive-args.txt ---------------------------------------------------------------------- diff --git a/src/docs/man/hive-args.txt b/src/docs/man/hive-args.txt index 7d9e427..b92a446 100644 --- a/src/docs/man/hive-args.txt +++ b/src/docs/man/hive-args.txt @@ -29,7 +29,7 @@ Hive options Overwrites existing data in the hive table if it exists. --create-hive-table:: - If set, then the job will fail if the target hive table exits. + If set, then the job will fail if the target hive table exists. By default this property is false. --hive-table (table-name):: http://git-wip-us.apache.org/repos/asf/sqoop/blob/df3a81b5/src/docs/man/sqoop-create-hive-table.txt ---------------------------------------------------------------------- diff --git a/src/docs/man/sqoop-create-hive-table.txt b/src/docs/man/sqoop-create-hive-table.txt index 7aebcc1..afae9d0 100644 --- a/src/docs/man/sqoop-create-hive-table.txt +++ b/src/docs/man/sqoop-create-hive-table.txt @@ -35,7 +35,7 @@ Hive options Overwrites existing data in the hive table if it exists. --create-hive-table:: - If set, then the job will fail if the target hive table exits. + If set, then the job will fail if the target hive table exists. By default this property is false. --hive-table (table-name):: http://git-wip-us.apache.org/repos/asf/sqoop/blob/df3a81b5/src/docs/user/create-hive-table.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/create-hive-table.txt b/src/docs/user/create-hive-table.txt index 3aa34fd..dceb204 100644 --- a/src/docs/user/create-hive-table.txt +++ b/src/docs/user/create-hive-table.txt @@ -50,7 +50,7 @@ Argument Description +\--hive-home <dir>+ Override +$HIVE_HOME+ +\--hive-overwrite+ Overwrite existing data in the Hive table. +\--create-hive-table+ If set, then the job will fail if the target hive - table exits. By default this property is false. + table exists. By default this property is false. +\--hive-table <table-name>+ Sets the table name to use when importing \ to Hive. +\--table+ The database table to read the \ http://git-wip-us.apache.org/repos/asf/sqoop/blob/df3a81b5/src/docs/user/hive-args.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/hive-args.txt b/src/docs/user/hive-args.txt index 53de92d..e54ee1e 100644 --- a/src/docs/user/hive-args.txt +++ b/src/docs/user/hive-args.txt @@ -28,7 +28,7 @@ Argument Description default delimiters if none are set.) +\--hive-overwrite+ Overwrite existing data in the Hive table. +\--create-hive-table+ If set, then the job will fail if the target hive - table exits. By default this property is false. + table exists. By default this property is false. +\--hive-table <table-name>+ Sets the table name to use when importing\ to Hive. +\--hive-drop-import-delims+ Drops '\n', '\r', and '\01' from string\ http://git-wip-us.apache.org/repos/asf/sqoop/blob/df3a81b5/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java index d5bfae2..7521464 100644 --- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java @@ -47,6 +47,7 @@ import com.cloudera.sqoop.mapreduce.ImportJobBase; import com.cloudera.sqoop.mapreduce.db.DBConfiguration; import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat; import com.cloudera.sqoop.orm.AvroSchemaGenerator; +import org.kitesdk.data.Datasets; import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat; /** @@ -105,8 +106,27 @@ public class DataDrivenImportJob extends ImportJobBase { final String schemaNameOverride = tableName; Schema schema = generateAvroSchema(tableName, schemaNameOverride); String uri = getKiteUri(conf, tableName); - ParquetJob.configureImportJob(conf, schema, uri, options.isAppendMode(), - options.doHiveImport() && options.doOverwriteHiveTable()); + ParquetJob.WriteMode writeMode; + + if (options.doHiveImport()) { + if (options.doOverwriteHiveTable()) { + writeMode = ParquetJob.WriteMode.OVERWRITE; + } else { + writeMode = ParquetJob.WriteMode.APPEND; + if (Datasets.exists(uri)) { + LOG.warn("Target Hive table '" + tableName + "' exists! Sqoop will " + + "append data into the existing Hive table. Consider using " + + "--hive-overwrite, if you do NOT intend to do appending."); + } + } + } else { + // Note that there is no such an import argument for overwriting HDFS + // dataset, so overwrite mode is not supported yet. + // Sqoop's append mode means to merge two independent datasets. We + // choose DEFAULT as write mode. + writeMode = ParquetJob.WriteMode.DEFAULT; + } + ParquetJob.configureImportJob(conf, schema, uri, writeMode); } job.setMapperClass(getMapperClass()); http://git-wip-us.apache.org/repos/asf/sqoop/blob/df3a81b5/src/java/org/apache/sqoop/mapreduce/ParquetJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java index df55dbc..c775ef3 100644 --- a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java +++ b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java @@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration; import org.kitesdk.data.CompressionType; import org.kitesdk.data.Dataset; import org.kitesdk.data.DatasetDescriptor; -import org.kitesdk.data.DatasetNotFoundException; import org.kitesdk.data.Datasets; import org.kitesdk.data.Formats; import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat; @@ -46,6 +45,9 @@ public final class ParquetJob { private static final String CONF_AVRO_SCHEMA = "parquetjob.avro.schema"; static final String CONF_OUTPUT_CODEC = "parquetjob.output.codec"; + enum WriteMode { + DEFAULT, APPEND, OVERWRITE + }; public static Schema getAvroSchema(Configuration conf) { return new Schema.Parser().parse(conf.get(CONF_AVRO_SCHEMA)); @@ -71,14 +73,14 @@ public final class ParquetJob { * {@link org.apache.avro.generic.GenericRecord}. */ public static void configureImportJob(Configuration conf, Schema schema, - String uri, boolean reuseExistingDataset, boolean overwrite) throws IOException { + String uri, WriteMode writeMode) throws IOException { Dataset dataset; - if (reuseExistingDataset || overwrite) { - try { - dataset = Datasets.load(uri); - } catch (DatasetNotFoundException ex) { - dataset = createDataset(schema, getCompressionType(conf), uri); + if (Datasets.exists(uri)) { + if (WriteMode.DEFAULT.equals(writeMode)) { + throw new IOException("Destination exists! " + uri); } + + dataset = Datasets.load(uri); Schema writtenWith = dataset.getDescriptor().getSchema(); if (!SchemaValidationUtil.canRead(writtenWith, schema)) { throw new IOException( @@ -90,10 +92,14 @@ public final class ParquetJob { } conf.set(CONF_AVRO_SCHEMA, schema.toString()); - if (overwrite) { - DatasetKeyOutputFormat.configure(conf).overwrite(dataset); + DatasetKeyOutputFormat.ConfigBuilder builder = + DatasetKeyOutputFormat.configure(conf); + if (WriteMode.OVERWRITE.equals(writeMode)) { + builder.overwrite(dataset); + } else if (WriteMode.APPEND.equals(writeMode)) { + builder.appendTo(dataset); } else { - DatasetKeyOutputFormat.configure(conf).writeTo(dataset); + builder.writeTo(dataset); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/df3a81b5/src/test/com/cloudera/sqoop/TestParquetImport.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/TestParquetImport.java b/src/test/com/cloudera/sqoop/TestParquetImport.java index 07e140a..ae2e617 100644 --- a/src/test/com/cloudera/sqoop/TestParquetImport.java +++ b/src/test/com/cloudera/sqoop/TestParquetImport.java @@ -204,7 +204,7 @@ public class TestParquetImport extends ImportJobTestCase { createTableWithColTypes(types, vals); runImport(getOutputArgv(true, null)); - runImportAgain(getOutputArgv(true, new String[]{"--append"})); + runImport(getOutputArgv(true, new String[]{"--append"})); DatasetReader<GenericRecord> reader = getReader(); try { @@ -226,7 +226,7 @@ public class TestParquetImport extends ImportJobTestCase { runImport(getOutputArgv(true, null)); try { - runImportAgain(getOutputArgv(true, null)); + runImport(getOutputArgv(true, null)); fail(""); } catch (IOException ex) { // ok http://git-wip-us.apache.org/repos/asf/sqoop/blob/df3a81b5/src/test/com/cloudera/sqoop/hive/TestHiveImport.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/hive/TestHiveImport.java b/src/test/com/cloudera/sqoop/hive/TestHiveImport.java index fa717cb..b626964 100644 --- a/src/test/com/cloudera/sqoop/hive/TestHiveImport.java +++ b/src/test/com/cloudera/sqoop/hive/TestHiveImport.java @@ -20,23 +20,25 @@ package com.cloudera.sqoop.hive; import java.io.BufferedReader; import java.io.File; -import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import org.apache.avro.generic.GenericRecord; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException; import com.cloudera.sqoop.testutil.CommonArgs; -import com.cloudera.sqoop.testutil.HsqldbTestServer; import com.cloudera.sqoop.testutil.ImportJobTestCase; import com.cloudera.sqoop.tool.BaseSqoopTool; import com.cloudera.sqoop.tool.CodeGenTool; @@ -44,6 +46,9 @@ import com.cloudera.sqoop.tool.CreateHiveTableTool; import com.cloudera.sqoop.tool.ImportTool; import com.cloudera.sqoop.tool.SqoopTool; import org.apache.commons.cli.ParseException; +import org.kitesdk.data.Dataset; +import org.kitesdk.data.DatasetReader; +import org.kitesdk.data.Datasets; /** * Test HiveImport capability after an import to HDFS. @@ -53,11 +58,13 @@ public class TestHiveImport extends ImportJobTestCase { public static final Log LOG = LogFactory.getLog( TestHiveImport.class.getName()); + @Before public void setUp() { super.setUp(); HiveImport.setTestMode(true); } + @After public void tearDown() { super.tearDown(); HiveImport.setTestMode(false); @@ -272,11 +279,47 @@ public class TestHiveImport extends ImportJobTestCase { setNumCols(3); String [] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" }; String [] vals = { "'test'", "42", "'somestring'" }; - String [] args_array = getArgv(false, null); - ArrayList<String> args = new ArrayList<String>(Arrays.asList(args_array)); - args.add("--as-parquetfile"); - runImportTest(TABLE_NAME, types, vals, "normalImportAsParquet.q", args.toArray(new String[0]), - new ImportTool()); + String [] extraArgs = {"--as-parquetfile"}; + + runImportTest(TABLE_NAME, types, vals, "", getArgv(false, extraArgs), + new ImportTool()); + verifyHiveDataset(TABLE_NAME, new Object[][]{{"test", 42, "somestring"}}); + } + + private void verifyHiveDataset(String tableName, Object[][] valsArray) { + String datasetUri = String.format("dataset:hive:default/%s", + tableName.toLowerCase()); + assertTrue(Datasets.exists(datasetUri)); + Dataset dataset = Datasets.load(datasetUri); + assertFalse(dataset.isEmpty()); + + DatasetReader<GenericRecord> reader = dataset.newReader(); + try { + List<String> expectations = new ArrayList<String>(); + if (valsArray != null) { + for (Object[] vals : valsArray) { + expectations.add(Arrays.toString(vals)); + } + } + + while (reader.hasNext() && expectations.size() > 0) { + String actual = Arrays.toString( + convertGenericRecordToArray(reader.next())); + assertTrue("Expect record: " + actual, expectations.remove(actual)); + } + assertFalse(reader.hasNext()); + assertEquals(0, expectations.size()); + } finally { + reader.close(); + } + } + + private static Object[] convertGenericRecordToArray(GenericRecord record) { + Object[] result = new Object[record.getSchema().getFields().size()]; + for (int i = 0; i < result.length; i++) { + result[i] = record.get(i); + } + return result; } /** Test that table is created in hive with no data import. */ @@ -312,6 +355,53 @@ public class TestHiveImport extends ImportJobTestCase { new CreateHiveTableTool()); } + /** + * Test that table is created in hive and replaces the existing table if + * any. + */ + @Test + public void testCreateOverwriteHiveImportAsParquet() throws IOException { + final String TABLE_NAME = "CREATE_OVERWRITE_HIVE_IMPORT_AS_PARQUET"; + setCurTableName(TABLE_NAME); + setNumCols(3); + String [] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" }; + String [] vals = { "'test'", "42", "'somestring'" }; + String [] extraArgs = {"--as-parquetfile"}; + ImportTool tool = new ImportTool(); + + runImportTest(TABLE_NAME, types, vals, "", getArgv(false, extraArgs), tool); + verifyHiveDataset(TABLE_NAME, new Object[][]{{"test", 42, "somestring"}}); + + String [] valsToOverwrite = { "'test2'", "24", "'somestring2'" }; + String [] extraArgsForOverwrite = {"--as-parquetfile", "--hive-overwrite"}; + runImportTest(TABLE_NAME, types, valsToOverwrite, "", + getArgv(false, extraArgsForOverwrite), tool); + verifyHiveDataset(TABLE_NAME, new Object[][] {{"test2", 24, "somestring2"}}); + } + + /** + * Test that records are appended to an existing table. + */ + @Test + public void testAppendHiveImportAsParquet() throws IOException { + final String TABLE_NAME = "APPEND_HIVE_IMPORT_AS_PARQUET"; + setCurTableName(TABLE_NAME); + setNumCols(3); + String [] types = { "VARCHAR(32)", "INTEGER", "CHAR(64)" }; + String [] vals = { "'test'", "42", "'somestring'" }; + String [] extraArgs = {"--as-parquetfile"}; + String [] args = getArgv(false, extraArgs); + ImportTool tool = new ImportTool(); + + runImportTest(TABLE_NAME, types, vals, "", args, tool); + verifyHiveDataset(TABLE_NAME, new Object[][]{{"test", 42, "somestring"}}); + + String [] valsToAppend = { "'test2'", "4242", "'somestring2'" }; + runImportTest(TABLE_NAME, types, valsToAppend, "", args, tool); + verifyHiveDataset(TABLE_NAME, new Object[][] { + {"test2", 4242, "somestring2"}, {"test", 42, "somestring"}}); + } + /** Test that dates are coerced properly to strings. */ @Test public void testDate() throws IOException { http://git-wip-us.apache.org/repos/asf/sqoop/blob/df3a81b5/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java index 7934791..e3098d6 100644 --- a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java +++ b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java @@ -244,10 +244,11 @@ public abstract class BaseSqoopTestCase extends TestCase { } catch (IOException e) { LOG.warn(e); } - } - File s = new File(getWarehouseDir()); - if (!s.delete()) { - LOG.warn("Can't delete " + s.getPath()); + } else { + File s = new File(getWarehouseDir()); + if (!s.delete()) { + LOG.warn("Cannot delete " + s.getPath()); + } } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/df3a81b5/src/test/com/cloudera/sqoop/testutil/ImportJobTestCase.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/testutil/ImportJobTestCase.java b/src/test/com/cloudera/sqoop/testutil/ImportJobTestCase.java index 293bf10..08408a5 100644 --- a/src/test/com/cloudera/sqoop/testutil/ImportJobTestCase.java +++ b/src/test/com/cloudera/sqoop/testutil/ImportJobTestCase.java @@ -35,6 +35,7 @@ import com.cloudera.sqoop.orm.CompilationManager; import com.cloudera.sqoop.tool.SqoopTool; import com.cloudera.sqoop.tool.ImportTool; import com.cloudera.sqoop.util.ClassLoaderStack; +import org.junit.Before; /** * Class that implements common methods required for tests which import data @@ -45,6 +46,12 @@ public abstract class ImportJobTestCase extends BaseSqoopTestCase { public static final Log LOG = LogFactory.getLog( ImportJobTestCase.class.getName()); + @Before + public void setUp() { + super.setUp(); + removeTableDir(); + } + protected String getTablePrefix() { return "IMPORT_TABLE_"; } @@ -206,16 +213,6 @@ public abstract class ImportJobTestCase extends BaseSqoopTestCase { * execution). */ protected void runImport(SqoopTool tool, String [] argv) throws IOException { - boolean cleanup = true; - runImport(cleanup, tool, argv); - } - - private void runImport(boolean cleanup, SqoopTool tool, - String [] argv) throws IOException { - if (cleanup) { - removeTableDir(); - } - // run the tool through the normal entry-point. int ret; try { @@ -242,10 +239,4 @@ public abstract class ImportJobTestCase extends BaseSqoopTestCase { runImport(new ImportTool(), argv); } - protected void runImportAgain(String[] argv) - throws IOException { - boolean cleanup = false; - runImport(cleanup, new ImportTool(), argv); - } - } http://git-wip-us.apache.org/repos/asf/sqoop/blob/df3a81b5/testdata/hive/scripts/normalImportAsParquet.q ---------------------------------------------------------------------- diff --git a/testdata/hive/scripts/normalImportAsParquet.q b/testdata/hive/scripts/normalImportAsParquet.q deleted file mode 100644 index e434e9b..0000000 --- a/testdata/hive/scripts/normalImportAsParquet.q +++ /dev/null @@ -1,17 +0,0 @@ --- Licensed to the Apache Software Foundation (ASF) under one --- or more contributor license agreements. See the NOTICE file --- distributed with this work for additional information --- regarding copyright ownership. The ASF licenses this file --- to you under the Apache License, Version 2.0 (the --- "License"); you may not use this file except in compliance --- with the License. You may obtain a copy of the License at --- --- http://www.apache.org/licenses/LICENSE-2.0 --- --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. -CREATE TABLE IF NOT EXISTS `NORMAL_HIVE_IMPORT_AS_PARQUET` ( `DATA_COL0` STRING, `DATA_COL1` INT, `DATA_COL2` STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' LINES TERMINATED BY '\012' STORED AS PARQUET; -LOAD DATA INPATH 'file:BASEPATH/sqoop/warehouse/NORMAL_HIVE_IMPORT_AS_PARQUET' INTO TABLE `NORMAL_HIVE_IMPORT_AS_PARQUET`;
