PHOENIX-2538 - CsvBulkLoadTool should return non-zero exit status if import fails
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/55b7adad Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/55b7adad Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/55b7adad Branch: refs/heads/4.x-HBase-1.0 Commit: 55b7adadda89cbd84fa7fb45067ecf3dbc78f29c Parents: 1b3a4ca Author: ravimagham <[email protected]> Authored: Tue Dec 29 10:17:05 2015 -0800 Committer: ravimagham <[email protected]> Committed: Tue Dec 29 10:17:05 2015 -0800 ---------------------------------------------------------------------- .../phoenix/mapreduce/CsvBulkLoadToolIT.java | 65 +++++++++++++--- .../phoenix/mapreduce/AbstractBulkLoadTool.java | 80 ++++++++++---------- 2 files changed, 94 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/55b7adad/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java index 7daacb4..2970d56 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java @@ -17,6 +17,14 @@ */ package org.apache.phoenix.mapreduce; +import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; +import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.PrintWriter; import java.sql.Connection; import java.sql.DriverManager; @@ -28,6 +36,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.jdbc.PhoenixDriver; import org.apache.phoenix.query.QueryServices; @@ -36,18 +45,9 @@ import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; -import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - @Category(NeedsOwnMiniClusterTest.class) public class CsvBulkLoadToolIT { @@ -321,4 +321,51 @@ public class CsvBulkLoadToolIT { stmt.close(); } + @Test + public void testInvalidArguments() { + String tableName = "TABLE8"; + CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); + csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration()); + try { + csvBulkLoadTool.run(new String[] { + "--input", "/tmp/input4.csv", + "--table", tableName, + "--zookeeper", zkQuorum }); + fail(String.format("Table %s not created, hence should fail",tableName)); + } catch (Exception ex) { + assertTrue(ex instanceof IllegalArgumentException); + assertTrue(ex.getMessage().contains(String.format("Table %s not found", tableName))); + } + } + + @Test + public void testAlreadyExistsOutputPath() { + String tableName = "TABLE9"; + String outputPath = "/tmp/output/tabl9"; + try { + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, " + + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)"); + + FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration()); + fs.create(new Path(outputPath)); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input9.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1,FirstName 1,LastName 1"); + printWriter.println("2,FirstName 2,LastName 2"); + printWriter.close(); + + CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); + csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration()); + csvBulkLoadTool.run(new String[] { + "--input", "/tmp/input9.csv", + "--output", outputPath, + "--table", tableName, + "--zookeeper", zkQuorum }); + + fail(String.format("Output path %s already exists. hence, should fail",outputPath)); + } catch (Exception ex) { + assertTrue(ex instanceof FileAlreadyExistsException); + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/55b7adad/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java index 1d2594d..4b5d618 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java @@ -17,10 +17,14 @@ */ package org.apache.phoenix.mapreduce; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; @@ -57,14 +61,10 @@ import org.apache.phoenix.util.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ExecutionException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; /** * Base tool for running MapReduce-based ingests of data. @@ -174,8 +174,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { return loadData(conf, cmdLine); } - private int loadData(Configuration conf, CommandLine cmdLine) throws SQLException, - InterruptedException, ExecutionException, ClassNotFoundException { + private int loadData(Configuration conf, CommandLine cmdLine) throws Exception { String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt()); String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt()); String indexTableName = cmdLine.getOptionValue(INDEX_TABLE_NAME_OPT.getOpt()); @@ -255,47 +254,44 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { /** * Submits the jobs to the cluster. * Loads the HFiles onto the respective tables. + * @throws Exception */ public int submitJob(final Configuration conf, final String qualifiedTableName, - final String inputPaths, final Path outputPath, List<TargetTableRef> tablesToBeLoaded) { - try { - Job job = new Job(conf, "Phoenix MapReduce import for " + qualifiedTableName); - FileInputFormat.addInputPaths(job, inputPaths); - FileOutputFormat.setOutputPath(job, outputPath); + final String inputPaths, final Path outputPath, List<TargetTableRef> tablesToBeLoaded) throws Exception { + + Job job = Job.getInstance(conf, "Phoenix MapReduce import for " + qualifiedTableName); + FileInputFormat.addInputPaths(job, inputPaths); + FileOutputFormat.setOutputPath(job, outputPath); - job.setInputFormatClass(TextInputFormat.class); - job.setMapOutputKeyClass(TableRowkeyPair.class); - job.setMapOutputValueClass(KeyValue.class); - job.setOutputKeyClass(TableRowkeyPair.class); - job.setOutputValueClass(KeyValue.class); - job.setReducerClass(FormatToKeyValueReducer.class); + job.setInputFormatClass(TextInputFormat.class); + job.setMapOutputKeyClass(TableRowkeyPair.class); + job.setMapOutputValueClass(KeyValue.class); + job.setOutputKeyClass(TableRowkeyPair.class); + job.setOutputValueClass(KeyValue.class); + job.setReducerClass(FormatToKeyValueReducer.class); - MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded); + MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded); - final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded); - job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson); + final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded); + job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson); - // give subclasses their hook - setupJob(job); + // give subclasses their hook + setupJob(job); - LOG.info("Running MapReduce import job from {} to {}", inputPaths, outputPath); - boolean success = job.waitForCompletion(true); - - if (success) { - LOG.info("Loading HFiles from {}", outputPath); - completebulkload(conf,outputPath,tablesToBeLoaded); - } + LOG.info("Running MapReduce import job from {} to {}", inputPaths, outputPath); + boolean success = job.waitForCompletion(true); + if (success) { + LOG.info("Loading HFiles from {}", outputPath); + completebulkload(conf,outputPath,tablesToBeLoaded); LOG.info("Removing output directory {}", outputPath); - if (!FileSystem.get(conf).delete(outputPath, true)) { - LOG.error("Removing output directory {} failed", outputPath); + if(!FileSystem.get(conf).delete(outputPath, true)) { + LOG.error("Failed to delete the output directory {}", outputPath); } return 0; - } catch(Exception e) { - LOG.error("Error occurred submitting BulkLoad ", e); + } else { return -1; } - } private void completebulkload(Configuration conf,Path outputPath , List<TargetTableRef> tablesToBeLoaded) throws Exception {
