PHOENIX-4238 MR IndexScrutinyTool break with salted tables and indexes on views
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2a867a84 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2a867a84 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2a867a84 Branch: refs/heads/4.x-HBase-0.98 Commit: 2a867a84a804fdf5177f4061b9599a857a2d0f8e Parents: 0f5e4a1 Author: James Taylor <[email protected]> Authored: Fri Sep 29 14:04:35 2017 -0700 Committer: James Taylor <[email protected]> Committed: Fri Sep 29 14:05:42 2017 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/IndexScrutinyToolIT.java | 93 ++++++++++++++++---- .../mapreduce/util/IndexColumnNames.java | 16 +++- 2 files changed, 88 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a867a84/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java index f868cef..f2384ec 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java @@ -28,14 +28,20 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Random; +import java.util.TreeSet; import java.util.UUID; +import com.google.common.collect.Sets; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.mapreduce.Counters; @@ -62,21 +68,23 @@ import org.junit.experimental.categories.Category; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * Tests for the {@link IndexScrutinyTool} */ @Category(NeedsOwnMiniClusterTest.class) +@RunWith(Parameterized.class) public class IndexScrutinyToolIT extends BaseTest { - private static final String DATA_TABLE_DDL = - "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)"; - - private static final String INDEX_TABLE_DDL = "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)"; + private String dataTableDdl; + private String indexTableDdl; private static final String UPSERT_SQL = "UPSERT INTO %s VALUES(?,?,?,?)"; - private static final String INDEX_UPSERT_SQL = "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)"; + private static final String INDEX_UPSERT_SQL = + "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)"; private static final String DELETE_SQL = "DELETE FROM %s "; @@ -95,6 +103,18 @@ public class IndexScrutinyToolIT extends BaseTest { private long testTime; + @Parameterized.Parameters + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)", "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, + { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" } }); + } + + public IndexScrutinyToolIT(String dataTableDdl, String indexTableDdl) { + this.dataTableDdl = dataTableDdl; + this.indexTableDdl = indexTableDdl; + } + @BeforeClass public static void doSetup() throws Exception { Map<String, String> props = Maps.newHashMap(); @@ -107,9 +127,9 @@ public class IndexScrutinyToolIT extends BaseTest { @Before public void setup() throws SQLException { generateUniqueTableNames(); - createTestTable(getUrl(), String.format(DATA_TABLE_DDL, dataTableFullName)); + createTestTable(getUrl(), String.format(dataTableDdl, dataTableFullName)); createTestTable(getUrl(), - String.format(INDEX_TABLE_DDL, indexTableName, dataTableFullName)); + String.format(indexTableDdl, indexTableName, dataTableFullName)); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); conn = DriverManager.getConnection(getUrl(), props); String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName); @@ -340,17 +360,41 @@ public class IndexScrutinyToolIT extends BaseTest { // check the output files Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), dataTableFullName); DistributedFileSystem fs = getUtility().getDFSCluster().getFileSystem(); - Path outputFilePath = new Path(outputPath, "part-m-00000"); + List<Path> paths = Lists.newArrayList(); + Path firstPart = null; + for (FileStatus outputFile : fs.listStatus(outputPath)) { + if (outputFile.getPath().getName().startsWith("part")) { + if (firstPart == null) { + firstPart = outputFile.getPath(); + } else { + paths.add(outputFile.getPath()); + } + } + } + if (dataTableDdl.contains("SALT_BUCKETS")) { + fs.concat(firstPart, paths.toArray(new Path[0])); + } + Path outputFilePath = firstPart; assertTrue(fs.exists(outputFilePath)); FSDataInputStream fsDataInputStream = fs.open(outputFilePath); BufferedReader reader = new BufferedReader(new InputStreamReader(fsDataInputStream)); + TreeSet<String> lines = Sets.newTreeSet(); try { - assertEquals("[2, name-2, " + new Timestamp(testTime).toString() + ", 95123]\t[2, name-2, " + new Timestamp(testTime).toString() + ", 9999]", reader.readLine()); - assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 95123]\tTarget row not found", reader.readLine()); + String line = null; + while ((line = reader.readLine()) != null) { + lines.add(line); + } } finally { - reader.close(); - fsDataInputStream.close(); + IOUtils.closeQuietly(reader); + IOUtils.closeQuietly(fsDataInputStream); } + Iterator<String> lineIterator = lines.iterator(); + assertEquals( + "[2, name-2, " + new Timestamp(testTime).toString() + ", 95123]\t[2, name-2, " + new Timestamp(testTime) + .toString() + ", 9999]", lineIterator.next()); + assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 95123]\tTarget row not found", + lineIterator.next()); + } /** @@ -415,7 +459,12 @@ public class IndexScrutinyToolIT extends BaseTest { scrutinyTimeMillis); ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery); assertTrue(rs.next()); - assertFalse(rs.next()); + if (dataTableDdl.contains("SALT_BUCKETS")) { + assertTrue(rs.next()); + assertFalse(rs.next()); + } else { + assertFalse(rs.next()); + } } private SourceTargetColumnNames getColNames() throws SQLException { @@ -452,10 +501,17 @@ public class IndexScrutinyToolIT extends BaseTest { indexTableFullName, scrutinyTimeMillis); assertTrue(metadataRs.next()); List<? extends Object> expected = - Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis, - SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L, - 2L, 1L, 1L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]", - "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery); + Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis, + SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L, + 2L, 1L, 1L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]", + "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery); + if (dataTableDdl.contains("SALT_BUCKETS")) { + expected = Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis, + SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L, + 2L, 1L, 2L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]", + "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery); + } + assertRsValues(metadataRs, expected); String missingTargetQuery = metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET"); rs = conn.createStatement().executeQuery(missingTargetQuery); @@ -485,8 +541,7 @@ public class IndexScrutinyToolIT extends BaseTest { } private int countRows(String tableFullName) throws SQLException { - ResultSet count = - conn.createStatement().executeQuery("select count(*) from " + tableFullName); + ResultSet count = conn.createStatement().executeQuery("select count(*) from " + tableFullName); count.next(); int numRows = count.getInt(1); return numRows; http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a867a84/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java index 5daa1ed..6f2959f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java @@ -52,7 +52,19 @@ public class IndexColumnNames { this.pdataTable = pdataTable; this.pindexTable = pindexTable; List<PColumn> pindexCols = pindexTable.getColumns(); + List<PColumn> pkColumns = pindexTable.getPKColumns(); Set<String> indexColsAdded = new HashSet<String>(); + int offset = 0; + if (pindexTable.getBucketNum() != null) { + offset++; + } + if (pindexTable.getViewIndexId() != null) { + offset++; + } + if (offset > 0) { + pindexCols = pindexCols.subList(offset, pindexCols.size()); + pkColumns = pkColumns.subList(offset, pkColumns.size()); + } // first add the data pk columns for (PColumn indexCol : pindexCols) { @@ -68,7 +80,7 @@ public class IndexColumnNames { } // then the rest of the index pk columns - for (PColumn indexPkCol : pindexTable.getPKColumns()) { + for (PColumn indexPkCol : pkColumns) { String indexColName = indexPkCol.getName().getString(); if (!indexColsAdded.contains(indexColName)) { indexPkColNames.add(indexColName); @@ -81,7 +93,7 @@ public class IndexColumnNames { } // then the covered columns (rest of the columns) - for (PColumn indexCol : pindexTable.getColumns()) { + for (PColumn indexCol : pindexCols) { String indexColName = indexCol.getName().getString(); if (!indexColsAdded.contains(indexColName)) { indexNonPkColNames.add(indexColName);
