PHOENIX-4238 MR IndexScrutinyTool break with salted tables and indexes on views


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2a867a84
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2a867a84
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2a867a84

Branch: refs/heads/4.x-HBase-0.98
Commit: 2a867a84a804fdf5177f4061b9599a857a2d0f8e
Parents: 0f5e4a1
Author: James Taylor <[email protected]>
Authored: Fri Sep 29 14:04:35 2017 -0700
Committer: James Taylor <[email protected]>
Committed: Fri Sep 29 14:05:42 2017 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/IndexScrutinyToolIT.java    | 93 ++++++++++++++++----
 .../mapreduce/util/IndexColumnNames.java        | 16 +++-
 2 files changed, 88 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a867a84/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
index f868cef..f2384ec 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
@@ -28,14 +28,20 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
+import java.util.TreeSet;
 import java.util.UUID;
 
+import com.google.common.collect.Sets;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.mapreduce.Counters;
@@ -62,21 +68,23 @@ import org.junit.experimental.categories.Category;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * Tests for the {@link IndexScrutinyTool}
  */
 @Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
 public class IndexScrutinyToolIT extends BaseTest {
 
-    private static final String DATA_TABLE_DDL =
-            "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, 
ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)";
-
-    private static final String INDEX_TABLE_DDL = "CREATE INDEX %s ON %s 
(NAME, EMPLOY_DATE) INCLUDE (ZIP)";
+    private String dataTableDdl;
+    private String indexTableDdl;
 
     private static final String UPSERT_SQL = "UPSERT INTO %s VALUES(?,?,?,?)";
 
-    private static final String INDEX_UPSERT_SQL = "UPSERT INTO %s 
(\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)";
+    private static final String INDEX_UPSERT_SQL =
+        "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") 
values (?,?,?,?)";
 
     private static final String DELETE_SQL = "DELETE FROM %s ";
 
@@ -95,6 +103,18 @@ public class IndexScrutinyToolIT extends BaseTest {
 
     private long testTime;
 
+    @Parameterized.Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+            { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, 
ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)", "CREATE LOCAL INDEX %s 
ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" },
+            { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, 
ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", "CREATE 
INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" } });
+    }
+
+    public IndexScrutinyToolIT(String dataTableDdl, String indexTableDdl) {
+        this.dataTableDdl = dataTableDdl;
+        this.indexTableDdl = indexTableDdl;
+    }
+
     @BeforeClass
     public static void doSetup() throws Exception {
         Map<String, String> props = Maps.newHashMap();
@@ -107,9 +127,9 @@ public class IndexScrutinyToolIT extends BaseTest {
     @Before
     public void setup() throws SQLException {
         generateUniqueTableNames();
-        createTestTable(getUrl(), String.format(DATA_TABLE_DDL, 
dataTableFullName));
+        createTestTable(getUrl(), String.format(dataTableDdl, 
dataTableFullName));
         createTestTable(getUrl(),
-            String.format(INDEX_TABLE_DDL, indexTableName, dataTableFullName));
+            String.format(indexTableDdl, indexTableName, dataTableFullName));
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         conn = DriverManager.getConnection(getUrl(), props);
         String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName);
@@ -340,17 +360,41 @@ public class IndexScrutinyToolIT extends BaseTest {
         // check the output files
         Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), 
dataTableFullName);
         DistributedFileSystem fs = 
getUtility().getDFSCluster().getFileSystem();
-        Path outputFilePath = new Path(outputPath, "part-m-00000");
+        List<Path> paths = Lists.newArrayList();
+        Path firstPart = null;
+        for (FileStatus outputFile : fs.listStatus(outputPath)) {
+            if (outputFile.getPath().getName().startsWith("part")) {
+                if (firstPart == null) {
+                    firstPart = outputFile.getPath();
+                } else {
+                    paths.add(outputFile.getPath());
+                }
+            }
+        }
+        if (dataTableDdl.contains("SALT_BUCKETS")) {
+            fs.concat(firstPart, paths.toArray(new Path[0]));
+        }
+        Path outputFilePath = firstPart;
         assertTrue(fs.exists(outputFilePath));
         FSDataInputStream fsDataInputStream = fs.open(outputFilePath);
         BufferedReader reader = new BufferedReader(new 
InputStreamReader(fsDataInputStream));
+        TreeSet<String> lines = Sets.newTreeSet();
         try {
-            assertEquals("[2, name-2, " + new Timestamp(testTime).toString() + 
", 95123]\t[2, name-2, " + new Timestamp(testTime).toString() + ", 9999]", 
reader.readLine());
-            assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + 
", 95123]\tTarget row not found", reader.readLine());
+            String line = null;
+            while ((line = reader.readLine()) != null) {
+                lines.add(line);
+            }
         } finally {
-            reader.close();
-            fsDataInputStream.close();
+            IOUtils.closeQuietly(reader);
+            IOUtils.closeQuietly(fsDataInputStream);
         }
+        Iterator<String> lineIterator = lines.iterator();
+        assertEquals(
+            "[2, name-2, " + new Timestamp(testTime).toString() + ", 
95123]\t[2, name-2, " + new Timestamp(testTime)
+                .toString() + ", 9999]", lineIterator.next());
+        assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 
95123]\tTarget row not found",
+            lineIterator.next());
+
     }
 
     /**
@@ -415,7 +459,12 @@ public class IndexScrutinyToolIT extends BaseTest {
                     scrutinyTimeMillis);
         ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery);
         assertTrue(rs.next());
-        assertFalse(rs.next());
+        if (dataTableDdl.contains("SALT_BUCKETS")) {
+            assertTrue(rs.next());
+            assertFalse(rs.next());
+        } else {
+            assertFalse(rs.next());
+        }
     }
 
     private SourceTargetColumnNames getColNames() throws SQLException {
@@ -452,10 +501,17 @@ public class IndexScrutinyToolIT extends BaseTest {
                     indexTableFullName, scrutinyTimeMillis);
         assertTrue(metadataRs.next());
         List<? extends Object> expected =
-                Arrays.asList(dataTableFullName, indexTableFullName, 
scrutinyTimeMillis,
-                    SourceTable.DATA_TABLE_SOURCE.name(), 
Arrays.toString(argValues), 3L, 0L, 1L,
-                    2L, 1L, 1L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, 
\"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
-                    "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" 
DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery);
+            Arrays.asList(dataTableFullName, indexTableFullName, 
scrutinyTimeMillis,
+                SourceTable.DATA_TABLE_SOURCE.name(), 
Arrays.toString(argValues), 3L, 0L, 1L,
+                2L, 1L, 1L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, 
\"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
+                "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" 
DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery);
+        if (dataTableDdl.contains("SALT_BUCKETS")) {
+            expected = Arrays.asList(dataTableFullName, indexTableFullName, 
scrutinyTimeMillis,
+                SourceTable.DATA_TABLE_SOURCE.name(), 
Arrays.toString(argValues), 3L, 0L, 1L,
+                2L, 1L, 2L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, 
\"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
+                "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" 
DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery);
+        }
+
         assertRsValues(metadataRs, expected);
         String missingTargetQuery = 
metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET");
         rs = conn.createStatement().executeQuery(missingTargetQuery);
@@ -485,8 +541,7 @@ public class IndexScrutinyToolIT extends BaseTest {
     }
 
     private int countRows(String tableFullName) throws SQLException {
-        ResultSet count =
-                conn.createStatement().executeQuery("select count(*) from " + 
tableFullName);
+        ResultSet count = conn.createStatement().executeQuery("select count(*) 
from " + tableFullName);
         count.next();
         int numRows = count.getInt(1);
         return numRows;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a867a84/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
index 5daa1ed..6f2959f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java
@@ -52,7 +52,19 @@ public class IndexColumnNames {
         this.pdataTable = pdataTable;
         this.pindexTable = pindexTable;
         List<PColumn> pindexCols = pindexTable.getColumns();
+        List<PColumn> pkColumns = pindexTable.getPKColumns();
         Set<String> indexColsAdded = new HashSet<String>();
+        int offset = 0;
+        if (pindexTable.getBucketNum() != null) {
+            offset++;
+        }
+        if (pindexTable.getViewIndexId() != null) {
+            offset++;
+        }
+        if (offset > 0) {
+            pindexCols = pindexCols.subList(offset, pindexCols.size());
+            pkColumns = pkColumns.subList(offset, pkColumns.size());
+        }
 
         // first add the data pk columns
         for (PColumn indexCol : pindexCols) {
@@ -68,7 +80,7 @@ public class IndexColumnNames {
         }
 
         // then the rest of the index pk columns
-        for (PColumn indexPkCol : pindexTable.getPKColumns()) {
+        for (PColumn indexPkCol : pkColumns) {
             String indexColName = indexPkCol.getName().getString();
             if (!indexColsAdded.contains(indexColName)) {
                 indexPkColNames.add(indexColName);
@@ -81,7 +93,7 @@ public class IndexColumnNames {
         }
 
         // then the covered columns (rest of the columns)
-        for (PColumn indexCol : pindexTable.getColumns()) {
+        for (PColumn indexCol : pindexCols) {
             String indexColName = indexCol.getName().getString();
             if (!indexColsAdded.contains(indexColName)) {
                 indexNonPkColNames.add(indexColName);

Reply via email to