Repository: phoenix
Updated Branches:
  refs/heads/master 29535f342 -> 6dcc88262


PHOENIX-2239 Improve strict mode in psql CSV load

Enforce strict mode when individual rows can't be parsed or
upserted.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/88648372
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/88648372
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/88648372

Branch: refs/heads/master
Commit: 886483728f0376dea748258348273b214382b535
Parents: 29535f3
Author: Gabriel Reid <gabri...@ngdata.com>
Authored: Wed Sep 9 11:08:42 2015 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Sat Sep 19 18:30:36 2015 +0200

----------------------------------------------------------------------
 .../phoenix/end2end/CSVCommonsLoaderIT.java     | 40 +++++++++++++++++++-
 .../phoenix/mapreduce/CsvToKeyValueMapper.java  |  8 ++--
 .../apache/phoenix/util/CSVCommonsLoader.java   | 15 ++++++--
 .../phoenix/util/csv/CsvUpsertExecutor.java     |  4 +-
 .../phoenix/util/csv/CsvUpsertExecutorTest.java |  5 ++-
 5 files changed, 58 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/88648372/phoenix-core/src/it/java/org/apache/phoenix/end2end/CSVCommonsLoaderIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CSVCommonsLoaderIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CSVCommonsLoaderIT.java
index c7287ea..b78bb63 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CSVCommonsLoaderIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CSVCommonsLoaderIT.java
@@ -32,10 +32,12 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Properties;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVRecord;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PArrayDataType;
 import org.apache.phoenix.util.CSVCommonsLoader;
@@ -403,6 +405,40 @@ public class CSVCommonsLoaderIT extends 
BaseHBaseManagedTimeIT {
         }
     }
 
+    // Ensure that strict mode also causes the import to stop if a data type 
on a single
+    // row is not correct
+    @Test
+    public void testCSVUpsertWithInvalidNumericalData_StrictMode() throws 
Exception {
+        CSVParser parser = null;
+        PhoenixConnection conn = null;
+        try {
+            // Create table
+            String statements = "CREATE TABLE IF NOT EXISTS " + STOCK_TABLE
+                    + "(SYMBOL VARCHAR NOT NULL PRIMARY KEY, COMPANY_ID 
BIGINT);";
+            conn = DriverManager.getConnection(getUrl())
+                    .unwrap(PhoenixConnection.class);
+            PhoenixRuntime.executeStatements(conn,
+                    new StringReader(statements), null);
+
+            // Upsert CSV file in strict mode
+            CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, STOCK_TABLE,
+                    Arrays.asList("SYMBOL", "COMPANY_ID"), true);
+            try {
+                csvUtil.upsert(new StringReader(STOCK_CSV_VALUES));
+                fail("Running an upsert with data that can't be upserted in 
strict mode "
+                        + "should throw an exception");
+            } catch (IllegalDataException e) {
+                // Expected
+            }
+
+        } finally {
+            if (parser != null)
+                parser.close();
+            if (conn != null)
+                conn.close();
+        }
+    }
+
     @Test
     public void testCSVUpsertWithAllColumn() throws Exception {
         CSVParser parser = null;
@@ -631,7 +667,7 @@ public class CSVCommonsLoaderIT extends 
BaseHBaseManagedTimeIT {
 
             // Upsert CSV file
             CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, 
"ARRAY_TABLE",
-                    null, true, ',', '"', null, "!");
+                    ImmutableList.<String>of(), true, ',', '"', null, "!");
             csvUtil.upsert(
                     new StringReader("ID,VALARRAY\n"
                             + "1,2!3!4\n"));
@@ -670,7 +706,7 @@ public class CSVCommonsLoaderIT extends 
BaseHBaseManagedTimeIT {
 
             // Upsert CSV file
             CSVCommonsLoader csvUtil = new CSVCommonsLoader(conn, "TS_TABLE",
-                    null, true, ',', '"', null, "!");
+                    ImmutableList.<String>of(), true, ',', '"', null, "!");
             csvUtil.upsert(
                     new StringReader("ID,TS\n"
                             + "1,1970-01-01 00:00:10\n"

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88648372/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
index c0328bd..68270d4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
@@ -56,6 +56,7 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.base.Throwables;
 
 /**
  * MapReduce mapper that converts CSV input lines into KeyValues that can be 
written to HFiles.
@@ -250,12 +251,11 @@ public class CsvToKeyValueMapper extends 
Mapper<LongWritable,Text,ImmutableBytes
         }
 
         @Override
-        public void errorOnRecord(CSVRecord csvRecord, String errorMessage) {
-            LOG.error("Error on record {}: {}", csvRecord, errorMessage);
+        public void errorOnRecord(CSVRecord csvRecord, Throwable throwable) {
+            LOG.error("Error on record " + csvRecord, throwable);
             context.getCounter(COUNTER_GROUP_NAME, "Errors on 
records").increment(1L);
             if (!ignoreRecordErrors) {
-                throw new RuntimeException("Error on record, " + errorMessage 
+ ", " +
-                        "record =" + csvRecord);
+                throw Throwables.propagate(throwable);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88648372/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java
index a82dc3c..b8b284a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.util;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -219,7 +220,8 @@ public class CSVCommonsLoader {
         try {
             conn.setAutoCommit(false);
             long start = System.currentTimeMillis();
-            CsvUpsertListener upsertListener = new CsvUpsertListener(conn, 
conn.getMutateBatchSize());
+            CsvUpsertListener upsertListener = new CsvUpsertListener(conn,
+                    conn.getMutateBatchSize(), isStrict);
             CsvUpsertExecutor csvUpsertExecutor = 
CsvUpsertExecutor.create(conn, tableName,
                     columnInfoList, upsertListener, arrayElementSeparator);
 
@@ -394,10 +396,12 @@ public class CSVCommonsLoader {
         private final PhoenixConnection conn;
         private final int upsertBatchSize;
         private long totalUpserts = 0L;
+        private final boolean strict;
 
-        CsvUpsertListener(PhoenixConnection conn, int upsertBatchSize) {
+        CsvUpsertListener(PhoenixConnection conn, int upsertBatchSize, boolean 
strict) {
             this.conn = conn;
             this.upsertBatchSize = upsertBatchSize;
+            this.strict = strict;
         }
 
         @Override
@@ -417,8 +421,11 @@ public class CSVCommonsLoader {
         }
 
         @Override
-        public void errorOnRecord(CSVRecord csvRecord, String errorMessage) {
-            LOG.error("Error upserting record {}: {}", csvRecord, 
errorMessage);
+        public void errorOnRecord(CSVRecord csvRecord, Throwable throwable) {
+            LOG.error("Error upserting record " + csvRecord, 
throwable.getMessage());
+            if (strict) {
+                throw Throwables.propagate(throwable);
+            }
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88648372/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
index 156c3a4..e680f5c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
@@ -78,7 +78,7 @@ public class CsvUpsertExecutor implements Closeable {
          *
          * @param csvRecord the CSV record that was being upserted when the 
error occurred
          */
-        void errorOnRecord(CSVRecord csvRecord, String errorMessage);
+        void errorOnRecord(CSVRecord csvRecord, Throwable throwable);
     }
 
 
@@ -165,7 +165,7 @@ public class CsvUpsertExecutor implements Closeable {
                 // listener, and it can do its own logging if needed
                 LOG.debug("Error on CSVRecord " + csvRecord, e);
             }
-            upsertListener.errorOnRecord(csvRecord, e.getMessage());
+            upsertListener.errorOnRecord(csvRecord, e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/88648372/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
index 84f44f3..6efe246 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
@@ -40,6 +40,7 @@ import java.sql.SQLException;
 import java.sql.Types;
 import java.util.List;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -94,7 +95,7 @@ public class CsvUpsertExecutorTest extends 
BaseConnectionlessQueryTest {
         CSVRecord csvRecordWithTooFewFields = createCsvRecord("123,NameValue");
         upsertExecutor.execute(csvRecordWithTooFewFields);
 
-        verify(upsertListener).errorOnRecord(eq(csvRecordWithTooFewFields), 
anyString());
+        verify(upsertListener).errorOnRecord(eq(csvRecordWithTooFewFields), 
any(Throwable.class));
         verifyNoMoreInteractions(upsertListener);
     }
 
@@ -134,7 +135,7 @@ public class CsvUpsertExecutorTest extends 
BaseConnectionlessQueryTest {
         CSVRecord csvRecordWithInvalidType = 
createCsvRecord("123,NameValue,ThisIsNotANumber,1:2:3");
         upsertExecutor.execute(csvRecordWithInvalidType);
 
-        verify(upsertListener).errorOnRecord(eq(csvRecordWithInvalidType), 
anyString());
+        verify(upsertListener).errorOnRecord(eq(csvRecordWithInvalidType), 
any(Throwable.class));
         verifyNoMoreInteractions(upsertListener);
     }
 

Reply via email to