This is an automated email from the ASF dual-hosted git repository. richardantal pushed a commit to branch 4.16 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.16 by this push: new 6b29b1d PHOENIX-6405 Disallow bulk loading into non-empty tables with global secondary indexes 6b29b1d is described below commit 6b29b1d348c1d9d303d792400b96c6e1208ffa69 Author: Richard Antal <antal97rich...@gmail.com> AuthorDate: Tue Jul 13 17:15:59 2021 +0200 PHOENIX-6405 Disallow bulk loading into non-empty tables with global secondary indexes Change-Id: I55e2f6138f69add7ffa028baab6d8ea80681acf0 --- .../apache/phoenix/end2end/CsvBulkLoadToolIT.java | 97 +++++++++++++++++++++- .../phoenix/mapreduce/AbstractBulkLoadTool.java | 22 ++++- .../java/org/apache/phoenix/util/SchemaUtil.java | 2 +- 3 files changed, 118 insertions(+), 3 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java index 168f44b..7a4d1ca 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java @@ -113,6 +113,101 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT { rs.close(); stmt.close(); } + + @Test + public void testImportWithGlobalIndex() throws Exception { + + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE S.TABLE1 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)"); + stmt.execute("CREATE INDEX glob_idx ON S.TABLE1(ID, T)"); + conn.commit(); + + FileSystem fs = FileSystem.get(getUtility().getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1,Name 1,1970/01/01"); + printWriter.println("2,Name 2,1970/01/02"); + printWriter.close(); + + fs = FileSystem.get(getUtility().getConfiguration()); + outputStream = fs.create(new Path("/tmp/input2.csv")); + printWriter = new PrintWriter(outputStream); + printWriter.println("3,Name 3,1970/01/03"); + printWriter.println("4,Name 4,1970/01/04"); + printWriter.close(); + + CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); + csvBulkLoadTool.setConf(new Configuration(getUtility().getConfiguration())); + csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd"); + int exitCode = csvBulkLoadTool.run(new String[] { + "--input", "/tmp/input1.csv", + "--table", "table1", + "--schema", "s", + "--zookeeper", zkQuorum}); + assertEquals(0, exitCode); + + csvBulkLoadTool = new CsvBulkLoadTool(); + csvBulkLoadTool.setConf(new Configuration(getUtility().getConfiguration())); + csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd"); + try { + exitCode = csvBulkLoadTool.run(new String[] { + "--input", "/tmp/input2.csv", + "--table", "table1", + "--schema", "s", + "--zookeeper", zkQuorum}); + fail("Bulk loading error should have happened earlier"); + } catch (Exception e){ + assertTrue(e.getMessage().contains("Bulk Loading error: Bulk loading is disabled for " + + "non empty tables with global indexes, because it will corrupt " + + "the global index table in most cases.\n" + + "Use the --corruptindexes option to override this check.")); + } + + ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM s.table1 ORDER BY id"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Name 1", rs.getString(2)); + assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3)); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("Name 2", rs.getString(2)); + assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3)); + assertFalse(rs.next()); + + csvBulkLoadTool = new CsvBulkLoadTool(); + csvBulkLoadTool.setConf(new Configuration(getUtility().getConfiguration())); + csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd"); + exitCode = csvBulkLoadTool.run(new String[] { + "--input", "/tmp/input2.csv", + "--table", "table1", + "--schema", "s", + "--zookeeper", zkQuorum, + "--corruptindexes"}); + assertEquals(0, exitCode); + + rs = stmt.executeQuery("SELECT id, name, t FROM s.table1 ORDER BY id"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Name 1", rs.getString(2)); + assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3)); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("Name 2", rs.getString(2)); + assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3)); + assertTrue(rs.next()); + assertEquals(3, rs.getInt(1)); + assertEquals("Name 3", rs.getString(2)); + assertEquals(DateUtil.parseDate("1970-01-03"), rs.getDate(3)); + assertTrue(rs.next()); + assertEquals(4, rs.getInt(1)); + assertEquals("Name 4", rs.getString(2)); + assertEquals(DateUtil.parseDate("1970-01-04"), rs.getDate(3)); + assertFalse(rs.next()); + + rs.close(); + stmt.close(); + } + @Test public void testImportWithRowTimestamp() throws Exception { @@ -443,7 +538,7 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT { checkIndexTableIsVerified(indexTableName); } } - + @Test public void testInvalidArguments() { String tableName = "TABLE8"; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java index bb1a343..686b611 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java @@ -85,6 +85,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { static final Option IGNORE_ERRORS_OPT = new Option("g", "ignore-errors", false, "Ignore input errors"); static final Option HELP_OPT = new Option("h", "help", false, "Show this help and quit"); static final Option SKIP_HEADER_OPT = new Option("k", "skip-header", false, "Skip the first line of CSV files (the header)"); + static final Option ENABLE_CORRUPT_INDEXES = new Option( "corruptindexes", "corruptindexes", false, "Allow bulk loading into non-empty tables with global secondary indexes"); /** * Set configuration values based on parsed command line options. @@ -109,6 +110,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { options.addOption(IGNORE_ERRORS_OPT); options.addOption(HELP_OPT); options.addOption(SKIP_HEADER_OPT); + options.addOption(ENABLE_CORRUPT_INDEXES); return options; } @@ -223,6 +225,12 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { configureOptions(cmdLine, importColumns, conf); String sName = SchemaUtil.normalizeIdentifier(schemaName); String tName = SchemaUtil.normalizeIdentifier(tableName); + + String tn = SchemaUtil.getEscapedTableName(sName, tName); + ResultSet rsempty = conn.createStatement().executeQuery("SELECT * FROM " + tn + " LIMIT 1"); + boolean tableNotEmpty = rsempty.next(); + rsempty.close(); + try { validateTable(conn, sName, tName); } finally { @@ -241,14 +249,26 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName); tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName, table.getPhysicalName().getString())); boolean hasLocalIndexes = false; + boolean hasGlobalIndexes = false; for(PTable index: table.getIndexes()) { if (index.getIndexType() == IndexType.LOCAL) { hasLocalIndexes = qualifiedIndexTableName == null ? true : index.getTableName().getString() .equals(qualifiedIndexTableName); - if (hasLocalIndexes) break; + if (hasLocalIndexes && hasGlobalIndexes) break; + } + if (index.getIndexType() == IndexType.GLOBAL) { + hasGlobalIndexes = true; + if (hasLocalIndexes && hasGlobalIndexes) break; } } + + if(hasGlobalIndexes && tableNotEmpty && !cmdLine.hasOption(ENABLE_CORRUPT_INDEXES.getOpt())){ + throw new IllegalStateException("Bulk Loading error: Bulk loading is disabled for non" + + " empty tables with global indexes, because it will corrupt the global index table in most cases.\n" + + "Use the --corruptindexes option to override this check."); + } + // using conn after it's been closed... o.O tablesToBeLoaded.addAll(getIndexTables(conn, qualifiedTableName)); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java index 49961f4..571da1e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java @@ -671,7 +671,7 @@ public class SchemaUtil { if (schemaName == null || schemaName.length() == 0) { return "\"" + tableName + "\""; } - return "\"" + schemaName + "\"." + "\"" + tableName + "\""; + return "\"" + schemaName + "\"" + QueryConstants.NAME_SEPARATOR + "\"" + tableName + "\""; } protected static PhoenixConnection addMetaDataColumn(PhoenixConnection conn, long scn, String columnDef) throws SQLException {