Repository: phoenix Updated Branches: refs/heads/master e6f0b62de -> 9e03a48fb
PHOENIX-2209 Building Local Index Asynchronously via IndexTool fails to populate index table(Rajeshbabu) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9e03a48f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9e03a48f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9e03a48f Branch: refs/heads/master Commit: 9e03a48fb3c76f4a53c11fc6ede21ad573f80157 Parents: e6f0b62 Author: Rajeshbabu Chintaguntla <[email protected]> Authored: Wed Jun 22 01:04:07 2016 +0530 Committer: Rajeshbabu Chintaguntla <[email protected]> Committed: Wed Jun 22 01:04:07 2016 +0530 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/IndexToolIT.java | 5 +- .../apache/phoenix/compile/UpsertCompiler.java | 48 ++++++++++++++++++-- .../java/org/apache/phoenix/util/IndexUtil.java | 21 +++++++++ 3 files changed, 69 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/9e03a48f/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index 9fb9e0a..cb013c8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -80,6 +80,7 @@ public class IndexToolIT extends BaseOwnClusterHBaseManagedTimeIT { optionBuilder.append(","); optionBuilder.append(" TRANSACTIONAL=true "); } + optionBuilder.append(" SPLIT ON(1,2)"); this.tableDDLOptions = optionBuilder.toString(); } @@ -143,7 +144,7 @@ public class IndexToolIT extends BaseOwnClusterHBaseManagedTimeIT { String actualExplainPlan = QueryUtil.getExplainPlan(rs); //assert we are pulling from data table. - assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY ROUND ROBIN FULL SCAN OVER %s", fullTableName), actualExplainPlan); + assertEquals(String.format("CLIENT 3-CHUNK PARALLEL 1-WAY ROUND ROBIN FULL SCAN OVER %s", fullTableName), actualExplainPlan); rs = stmt1.executeQuery(selectSql); assertTrue(rs.next()); @@ -204,7 +205,7 @@ public class IndexToolIT extends BaseOwnClusterHBaseManagedTimeIT { String expectedExplainPlan = ""; if(isLocal) { final String localIndexName = SchemaUtil.getTableName(schemaName, dataTable); - expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY ROUND ROBIN RANGE SCAN OVER %s [1]" + expectedExplainPlan = String.format("CLIENT 3-CHUNK PARALLEL 3-WAY ROUND ROBIN RANGE SCAN OVER %s [1]" + "\n SERVER FILTER BY FIRST KEY ONLY", localIndexName); } else { expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY ROUND ROBIN FULL SCAN OVER %s" http://git-wip-us.apache.org/repos/asf/phoenix/blob/9e03a48f/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index be6499b..26855aa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -113,7 +113,10 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; public class UpsertCompiler { - private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixStatement statement, boolean useServerTimestamp) throws SQLException { + private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, + PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation, + PhoenixStatement statement, boolean useServerTimestamp, IndexMaintainer maintainer, + byte[][] viewConstants) throws SQLException { Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length); byte[][] pkValues = new byte[table.getPKColumns().size()][]; // If the table uses salting, the first byte is the salting byte, set to an empty array @@ -144,6 +147,19 @@ public class UpsertCompiler { } ImmutableBytesPtr ptr = new ImmutableBytesPtr(); table.newKey(ptr, pkValues); + if (table.getIndexType() == IndexType.LOCAL && maintainer != null) { + byte[] rowKey = maintainer.buildDataRowKey(ptr, viewConstants); + HRegionLocation region = + statement.getConnection().getQueryServices() + .getTableRegionLocation(table.getParentName().getBytes(), rowKey); + byte[] regionPrefix = + region.getRegionInfo().getStartKey().length == 0 ? new byte[region + .getRegionInfo().getEndKey().length] : region.getRegionInfo() + .getStartKey(); + if (regionPrefix.length != 0) { + ptr.set(ScanRanges.prefixKey(ptr.get(), 0, regionPrefix, regionPrefix.length)); + } + } mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo)); } @@ -160,6 +176,19 @@ public class UpsertCompiler { int rowCount = 0; Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize); PTable table = tableRef.getTable(); + IndexMaintainer indexMaintainer = null; + byte[][] viewConstants = null; + if (table.getIndexType() == IndexType.LOCAL) { + PTable parentTable = + statement + .getConnection() + .getMetaDataCache() + .getTableRef( + new PTableKey(statement.getConnection().getTenantId(), table + .getParentName().getString())).getTable(); + indexMaintainer = table.getIndexMaintainer(parentTable, connection); + viewConstants = IndexUtil.getViewConstants(parentTable); + } try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); while (rs.next()) { @@ -185,7 +214,7 @@ public class UpsertCompiler { table.rowKeyOrderOptimizable()); values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr); } - setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp); + setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants); rowCount++; // Commit a batch if auto commit is true and we're at our batch size if (isAutoCommit && rowCount % batchSize == 0) { @@ -927,7 +956,20 @@ public class UpsertCompiler { } } Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1); - setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp); + IndexMaintainer indexMaintainer = null; + byte[][] viewConstants = null; + if (table.getIndexType() == IndexType.LOCAL) { + PTable parentTable = + statement + .getConnection() + .getMetaDataCache() + .getTableRef( + new PTableKey(statement.getConnection().getTenantId(), + table.getParentName().getString())).getTable(); + indexMaintainer = table.getIndexMaintainer(parentTable, connection); + viewConstants = IndexUtil.getViewConstants(parentTable); + } + setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants); return new MutationState(tableRef, mutation, 0, maxSize, connection); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9e03a48f/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 5532d71..86fa8ca 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -77,6 +77,7 @@ import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.ResultTuple; @@ -638,4 +639,24 @@ public class IndexUtil { return col.getExpressionStr() == null ? IndexUtil.getCaseSensitiveDataColumnFullName(col.getName().getString()) : col.getExpressionStr(); } + + public static byte[][] getViewConstants(PTable dataTable) { + if (dataTable.getType() != PTableType.VIEW) return null; + int dataPosOffset = (dataTable.getBucketNum() != null ? 1 : 0) + (dataTable.isMultiTenant() ? 1 : 0); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + List<byte[]> viewConstants = new ArrayList<byte[]>(); + List<PColumn> dataPkColumns = dataTable.getPKColumns(); + for (int i = dataPosOffset; i < dataPkColumns.size(); i++) { + PColumn dataPKColumn = dataPkColumns.get(i); + if (dataPKColumn.getViewConstant() != null) { + if (IndexUtil.getViewConstantValue(dataPKColumn, ptr)) { + viewConstants.add(ByteUtil.copyKeyBytesIfNecessary(ptr)); + } else { + throw new IllegalStateException(); + } + } + } + return viewConstants.isEmpty() ? null : viewConstants + .toArray(new byte[viewConstants.size()][]); + } }
