Repository: phoenix Updated Branches: refs/heads/master 29535f342 -> 6dcc88262
PHOENIX-2239 Improve strict mode in psql CSV load Enforce strict mode when individual rows can't be parsed or upserted. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/88648372 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/88648372 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/88648372 Branch: refs/heads/master Commit: 886483728f0376dea748258348273b214382b535 Parents: 29535f3 Author: Gabriel Reid <gabri...@ngdata.com> Authored: Wed Sep 9 11:08:42 2015 +0200 Committer: Gabriel Reid <gr...@apache.org> Committed: Sat Sep 19 18:30:36 2015 +0200 ---------------------------------------------------------------------- .../phoenix/end2end/CSVCommonsLoaderIT.java | 40 +++++++++++++++++++- .../phoenix/mapreduce/CsvToKeyValueMapper.java | 8 ++-- .../apache/phoenix/util/CSVCommonsLoader.java | 15 ++++++-- .../phoenix/util/csv/CsvUpsertExecutor.java | 4 +- .../phoenix/util/csv/CsvUpsertExecutorTest.java | 5 ++- 5 files changed, 58 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/88648372/phoenix-core/src/it/java/org/apache/phoenix/end2end/CSVCommonsLoaderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CSVCommonsLoaderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CSVCommonsLoaderIT.java index c7287ea..b78bb63 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CSVCommonsLoaderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CSVCommonsLoaderIT.java @@ -32,10 +32,12 @@ import java.util.Arrays; import java.util.Collections; import java.util.Properties; +import com.google.common.collect.ImmutableList; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixTestDriver; +import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.schema.types.PArrayDataType; import org.apache.phoenix.util.CSVCommonsLoader; @@ -403,6 +405,40 @@ public class CSVCommonsLoaderIT extends BaseHBaseManagedTimeIT { } } + // Ensure that strict mode also causes the import to stop if a data type on a single + // row is not correct + @Test + public void testCSVUpsertWithInvalidNumericalData_StrictMode() throws Exception { + CSVParser parser = null; + PhoenixConnection conn = null; + try { + // Create table + String statements = "CREATE TABLE IF NOT EXISTS " + STOCK_TABLE + + "(SYMBOL VARCHAR NOT NULL PRIMARY KEY, COMPANY_ID BIGINT);"; + conn = DriverManager.getConnection(getUrl()) + .unwrap(PhoenixConnection.class); + PhoenixRuntime.executeStatements(conn, + new StringReader(statements), null); + + // Upsert CSV file in strict mode + CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, STOCK_TABLE, + Arrays.asList("SYMBOL", "COMPANY_ID"), true); + try { + csvUtil.upsert(new StringReader(STOCK_CSV_VALUES)); + fail("Running an upsert with data that can't be upserted in strict mode " + + "should throw an exception"); + } catch (IllegalDataException e) { + // Expected + } + + } finally { + if (parser != null) + parser.close(); + if (conn != null) + conn.close(); + } + } + @Test public void testCSVUpsertWithAllColumn() throws Exception { CSVParser parser = null; @@ -631,7 +667,7 @@ public class CSVCommonsLoaderIT extends BaseHBaseManagedTimeIT { // Upsert CSV file CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, "ARRAY_TABLE", - null, true, ',', '"', null, "!"); + ImmutableList.<String>of(), true, ',', '"', null, "!"); csvUtil.upsert( new StringReader("ID,VALARRAY\n" + "1,2!3!4\n")); @@ -670,7 +706,7 @@ public class CSVCommonsLoaderIT extends BaseHBaseManagedTimeIT { // Upsert CSV file CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, "TS_TABLE", - null, true, ',', '"', null, "!"); + ImmutableList.<String>of(), true, ',', '"', null, "!"); csvUtil.upsert( new StringReader("ID,TS\n" + "1,1970-01-01 00:00:10\n" http://git-wip-us.apache.org/repos/asf/phoenix/blob/88648372/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java index c0328bd..68270d4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java @@ -56,6 +56,7 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.base.Throwables; /** * MapReduce mapper that converts CSV input lines into KeyValues that can be written to HFiles. @@ -250,12 +251,11 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes } @Override - public void errorOnRecord(CSVRecord csvRecord, String errorMessage) { - LOG.error("Error on record {}: {}", csvRecord, errorMessage); + public void errorOnRecord(CSVRecord csvRecord, Throwable throwable) { + LOG.error("Error on record " + csvRecord, throwable); context.getCounter(COUNTER_GROUP_NAME, "Errors on records").increment(1L); if (!ignoreRecordErrors) { - throw new RuntimeException("Error on record, " + errorMessage + ", " + - "record =" + csvRecord); + throw Throwables.propagate(throwable); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/88648372/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java b/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java index a82dc3c..b8b284a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java @@ -19,6 +19,7 @@ package org.apache.phoenix.util; import com.google.common.base.Charsets; import com.google.common.base.Joiner; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -219,7 +220,8 @@ public class CSVCommonsLoader { try { conn.setAutoCommit(false); long start = System.currentTimeMillis(); - CsvUpsertListener upsertListener = new CsvUpsertListener(conn, conn.getMutateBatchSize()); + CsvUpsertListener upsertListener = new CsvUpsertListener(conn, + conn.getMutateBatchSize(), isStrict); CsvUpsertExecutor csvUpsertExecutor = CsvUpsertExecutor.create(conn, tableName, columnInfoList, upsertListener, arrayElementSeparator); @@ -394,10 +396,12 @@ public class CSVCommonsLoader { private final PhoenixConnection conn; private final int upsertBatchSize; private long totalUpserts = 0L; + private final boolean strict; - CsvUpsertListener(PhoenixConnection conn, int upsertBatchSize) { + CsvUpsertListener(PhoenixConnection conn, int upsertBatchSize, boolean strict) { this.conn = conn; this.upsertBatchSize = upsertBatchSize; + this.strict = strict; } @Override @@ -417,8 +421,11 @@ public class CSVCommonsLoader { } @Override - public void errorOnRecord(CSVRecord csvRecord, String errorMessage) { - LOG.error("Error upserting record {}: {}", csvRecord, errorMessage); + public void errorOnRecord(CSVRecord csvRecord, Throwable throwable) { + LOG.error("Error upserting record " + csvRecord, throwable.getMessage()); + if (strict) { + throw Throwables.propagate(throwable); + } } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/88648372/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java index 156c3a4..e680f5c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java @@ -78,7 +78,7 @@ public class CsvUpsertExecutor implements Closeable { * * @param csvRecord the CSV record that was being upserted when the error occurred */ - void errorOnRecord(CSVRecord csvRecord, String errorMessage); + void errorOnRecord(CSVRecord csvRecord, Throwable throwable); } @@ -165,7 +165,7 @@ public class CsvUpsertExecutor implements Closeable { // listener, and it can do its own logging if needed LOG.debug("Error on CSVRecord " + csvRecord, e); } - upsertListener.errorOnRecord(csvRecord, e.getMessage()); + upsertListener.errorOnRecord(csvRecord, e); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/88648372/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java index 84f44f3..6efe246 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java @@ -40,6 +40,7 @@ import java.sql.SQLException; import java.sql.Types; import java.util.List; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -94,7 +95,7 @@ public class CsvUpsertExecutorTest extends BaseConnectionlessQueryTest { CSVRecord csvRecordWithTooFewFields = createCsvRecord("123,NameValue"); upsertExecutor.execute(csvRecordWithTooFewFields); - verify(upsertListener).errorOnRecord(eq(csvRecordWithTooFewFields), anyString()); + verify(upsertListener).errorOnRecord(eq(csvRecordWithTooFewFields), any(Throwable.class)); verifyNoMoreInteractions(upsertListener); } @@ -134,7 +135,7 @@ public class CsvUpsertExecutorTest extends BaseConnectionlessQueryTest { CSVRecord csvRecordWithInvalidType = createCsvRecord("123,NameValue,ThisIsNotANumber,1:2:3"); upsertExecutor.execute(csvRecordWithInvalidType); - verify(upsertListener).errorOnRecord(eq(csvRecordWithInvalidType), anyString()); + verify(upsertListener).errorOnRecord(eq(csvRecordWithInvalidType), any(Throwable.class)); verifyNoMoreInteractions(upsertListener); }