Repository: phoenix Updated Branches: refs/heads/calcite f00a3c981 -> 4060f3bcd
PHOENIX-2262 Improve collation for salted table Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4060f3bc Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4060f3bc Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4060f3bc Branch: refs/heads/calcite Commit: 4060f3bcd4300ce79aba2ef63672c8593ec35ea5 Parents: f00a3c9 Author: maryannxue <[email protected]> Authored: Wed Nov 4 21:17:58 2015 -0500 Committer: maryannxue <[email protected]> Committed: Wed Nov 4 21:17:58 2015 -0500 ---------------------------------------------------------------------- .../org/apache/phoenix/calcite/CalciteIT.java | 23 ++++++----- .../apache/phoenix/calcite/PhoenixSchema.java | 42 +++++++++++++++++--- .../apache/phoenix/calcite/PhoenixTable.java | 22 +++++----- .../phoenix/calcite/rel/PhoenixTableScan.java | 27 ++++++++++--- .../java/org/apache/phoenix/util/ScanUtil.java | 11 ++++- .../phoenix/calcite/ToExpressionTest.java | 2 +- 6 files changed, 95 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/4060f3bc/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java index 6623b37..b1720cf 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java @@ -762,7 +762,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { start(false).sql("select mypk0, avg(mypk1) from " + SALTED_TABLE_NAME + " group by mypk0") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixClientProject(MYPK0=[$0], EXPR$1=[CAST(/($1, $2)):INTEGER NOT NULL])\n" + - " PhoenixServerAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[COUNT()], isOrdered=[false])\n" + + " PhoenixServerAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[COUNT()], isOrdered=[true])\n" + " PhoenixTableScan(table=[[phoenix, SALTED_TEST_TABLE]])\n") .resultIs(new Object[][] { {1, 2}, @@ -1465,14 +1465,15 @@ public class CalciteIT extends BaseClientManagedTimeIT { start(true).sql("select count(*) from " + NOSALT_TABLE_NAME + " where col0 > 3") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixServerAggregate(group=[{}], EXPR$0=[COUNT()])\n" + - " PhoenixTableScan(table=[[phoenix, IDXSALTED_NOSALT_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 3)])\n") + " PhoenixServerProject(DUMMY=[0])\n" + + " PhoenixTableScan(table=[[phoenix, IDXSALTED_NOSALT_TEST_TABLE:unordered]], filter=[>(CAST($0):INTEGER, 3)])\n") .resultIs(new Object[][]{{2L}}) .close(); start(true).sql("select mypk0, mypk1, col0 from " + NOSALT_TABLE_NAME + " where col0 <= 4") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixToClientConverter\n" + " PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], COL0=[CAST($0):INTEGER])\n" + - " PhoenixTableScan(table=[[phoenix, IDXSALTED_NOSALT_TEST_TABLE]], filter=[<=(CAST($0):INTEGER, 4)])\n") + " PhoenixTableScan(table=[[phoenix, IDXSALTED_NOSALT_TEST_TABLE:unordered]], filter=[<=(CAST($0):INTEGER, 4)])\n") .resultIs(new Object[][] { {2, 3, 4}, {1, 2, 3}}) @@ -1480,7 +1481,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { start(true).sql("select * from " + SALTED_TABLE_NAME + " where mypk0 < 3") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixToClientConverter\n" + - " PhoenixTableScan(table=[[phoenix, SALTED_TEST_TABLE]], filter=[<($0, 3)])\n") + " PhoenixTableScan(table=[[phoenix, SALTED_TEST_TABLE:unordered]], filter=[<($0, 3)])\n") .resultIs(new Object[][] { {1, 2, 3, 4}, {2, 3, 4, 5}}) @@ -1488,14 +1489,15 @@ public class CalciteIT extends BaseClientManagedTimeIT { start(true).sql("select count(*) from " + SALTED_TABLE_NAME + " where col0 > 3") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixServerAggregate(group=[{}], EXPR$0=[COUNT()])\n" + - " PhoenixTableScan(table=[[phoenix, IDX_SALTED_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 3)])\n") + " PhoenixServerProject(DUMMY=[0])\n" + + " PhoenixTableScan(table=[[phoenix, IDX_SALTED_TEST_TABLE:unordered]], filter=[>(CAST($0):INTEGER, 3)])\n") .resultIs(new Object[][]{{2L}}) .close(); start(true).sql("select mypk0, mypk1, col0 from " + SALTED_TABLE_NAME + " where col0 <= 4") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixToClientConverter\n" + " PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], COL0=[CAST($0):INTEGER])\n" + - " PhoenixTableScan(table=[[phoenix, IDX_SALTED_TEST_TABLE]], filter=[<=(CAST($0):INTEGER, 4)])\n") + " PhoenixTableScan(table=[[phoenix, IDX_SALTED_TEST_TABLE:unordered]], filter=[<=(CAST($0):INTEGER, 4)])\n") .resultIs(new Object[][] { {2, 3, 4}, {1, 2, 3}}) @@ -1503,10 +1505,11 @@ public class CalciteIT extends BaseClientManagedTimeIT { start(true).sql("select count(*) from " + SALTED_TABLE_NAME + " where col1 > 4") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixServerAggregate(group=[{}], EXPR$0=[COUNT()])\n" + - " PhoenixTableScan(table=[[phoenix, IDXSALTED_SALTED_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 4)])\n") + " PhoenixServerProject(DUMMY=[0])\n" + + " PhoenixTableScan(table=[[phoenix, IDXSALTED_SALTED_TEST_TABLE:unordered]], filter=[>(CAST($0):INTEGER, 4)])\n") .resultIs(new Object[][]{{2L}}) .close(); - start(true).sql("select * from " + SALTED_TABLE_NAME + " where col1 <= 5") + start(true).sql("select * from " + SALTED_TABLE_NAME + " where col1 <= 5 order by col1") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixToClientConverter\n" + " PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], COL0=[$3], COL1=[CAST($0):INTEGER])\n" + @@ -1519,10 +1522,10 @@ public class CalciteIT extends BaseClientManagedTimeIT { .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixToClientConverter\n" + " PhoenixServerJoin(condition=[AND(=($0, $4), =($1, $5))], joinType=[inner])\n" + - " PhoenixTableScan(table=[[phoenix, SALTED_TEST_TABLE]], filter=[>($0, 1)])\n" + + " PhoenixTableScan(table=[[phoenix, SALTED_TEST_TABLE:unordered]], filter=[>($0, 1)])\n" + " PhoenixToClientConverter\n" + " PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], COL0=[$3], COL1=[CAST($0):INTEGER])\n" + - " PhoenixTableScan(table=[[phoenix, IDXSALTED_SALTED_TEST_TABLE]], filter=[<(CAST($0):INTEGER, 6)])\n") + " PhoenixTableScan(table=[[phoenix, IDXSALTED_SALTED_TEST_TABLE:unordered]], filter=[<(CAST($0):INTEGER, 6)])\n") .resultIs(new Object[][] { {2, 3, 4, 5, 2, 3, 4, 5}}) .close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/4060f3bc/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java index b0afbc7..bc2d424 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java @@ -20,6 +20,7 @@ import org.apache.phoenix.parse.TableName; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTable.ViewType; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; @@ -42,6 +43,7 @@ import java.util.Set; */ public class PhoenixSchema implements Schema { public static final Factory FACTORY = new Factory(); + private static final String UNORDERED_SUFFIX = ":unordered"; protected final String name; protected final String schemaName; @@ -99,7 +101,12 @@ public class PhoenixSchema implements Schema { ImmutableList.<ColumnDef>of()), pc); final List<TableRef> tables = x.getTables(); assert tables.size() == 1; - tableMap.put(tableName, tables.get(0).getTable()); + final PTable pTable = tables.get(0).getTable(); + tableMap.put(tableName, pTable); + if (pTable.getBucketNum() != null || pTable.getIndexType() == IndexType.LOCAL) { + final String unorderedTableName = tableName + UNORDERED_SUFFIX; + tableMap.put(unorderedTableName, pTable); + } } else { String viewSql = rs.getString(PhoenixDatabaseMetaData.VIEW_STATEMENT); String viewType = rs.getString(PhoenixDatabaseMetaData.VIEW_TYPE); @@ -134,7 +141,7 @@ public class PhoenixSchema implements Schema { @Override public Table getTable(String name) { PTable table = tableMap.get(name); - return table == null ? null : new PhoenixTable(pc, table); + return table == null ? null : new PhoenixTable(pc, table, !isUnorderedTableName(name)); } @Override @@ -183,9 +190,20 @@ public class PhoenixSchema implements Schema { public void defineIndexesAsMaterializations() { List<String> path = calciteSchema.path(null); - for (PTable table : tableMap.values()) { - for (PTable index : table.getIndexes()) { - addMaterialization(table, index, path); + for (Map.Entry<String, PTable> entry : tableMap.entrySet()) { + final String tableName = entry.getKey(); + final PTable table = entry.getValue(); + if (!isUnorderedTableName(tableName)) { + for (PTable index : table.getIndexes()) { + addMaterialization(table, index, path); + } + } + } + for (Map.Entry<String, PTable> entry : tableMap.entrySet()) { + final String tableName = entry.getKey(); + final PTable table = entry.getValue(); + if (isUnorderedTableName(tableName)) { + addUnorderedAsMaterialization(tableName, table, path); } } } @@ -205,6 +223,20 @@ public class PhoenixSchema implements Schema { MaterializationService.instance().defineMaterialization( calciteSchema, null, sb.toString(), path, index.getTableName().getString(), true, true); } + + protected void addUnorderedAsMaterialization(String tableName, PTable table, List<String> path) { + StringBuffer sb = new StringBuffer(); + sb.append("SELECT * FROM ") + .append("\"") + .append(table.getTableName().getString()) + .append("\""); + MaterializationService.instance().defineMaterialization( + calciteSchema, null, sb.toString(), path, tableName, true, true); + } + + private boolean isUnorderedTableName(String tableName) { + return tableName.endsWith(UNORDERED_SUFFIX); + } /** Schema factory that creates a * {@link org.apache.phoenix.calcite.PhoenixSchema}. http://git-wip-us.apache.org/repos/asf/phoenix/blob/4060f3bc/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java index 4be7450..272cd47 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java @@ -42,21 +42,25 @@ public class PhoenixTable extends AbstractTable implements TranslatableTable { public final ImmutableBitSet pkBitSet; public final RelCollation collation; public final PhoenixConnection pc; + public final boolean requireRowKeyOrder; public static int getStartingColumnPosition(PTable pTable) { return (pTable.getBucketNum() == null ? 0 : 1) + (pTable.isMultiTenant() ? 1 : 0) + (pTable.getViewIndexId() == null ? 0 : 1); } - public PhoenixTable(PhoenixConnection pc, PTable pTable) { + public PhoenixTable(PhoenixConnection pc, PTable pTable, boolean requireRowKeyOrder) { this.pc = Preconditions.checkNotNull(pc); this.pTable = Preconditions.checkNotNull(pTable); + this.requireRowKeyOrder = requireRowKeyOrder; List<Integer> pkPositions = Lists.<Integer> newArrayList(); List<RelFieldCollation> fieldCollations = Lists.<RelFieldCollation> newArrayList(); - for (PColumn column : pTable.getPKColumns()) { - int position = column.getPosition(); - SortOrder sortOrder = column.getSortOrder(); - pkPositions.add(position); - fieldCollations.add(new RelFieldCollation(position, sortOrder == SortOrder.ASC ? Direction.ASCENDING : Direction.DESCENDING)); + if (requireRowKeyOrder) { + for (PColumn column : pTable.getPKColumns()) { + int position = column.getPosition(); + SortOrder sortOrder = column.getSortOrder(); + pkPositions.add(position); + fieldCollations.add(new RelFieldCollation(position, sortOrder == SortOrder.ASC ? Direction.ASCENDING : Direction.DESCENDING)); + } } this.pkBitSet = ImmutableBitSet.of(pkPositions); this.collation = RelCollationTraitDef.INSTANCE.canonize(RelCollations.of(fieldCollations)); @@ -128,9 +132,9 @@ public class PhoenixTable extends AbstractTable implements TranslatableTable { @Override public List<RelCollation> getCollations() { - return pTable.getBucketNum() == null ? - ImmutableList.<RelCollation> of(collation) - : ImmutableList.<RelCollation>of(); + return collation.getFieldCollations().isEmpty() ? + ImmutableList.<RelCollation>of() + : ImmutableList.<RelCollation>of(collation); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/4060f3bc/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java index bef650d..567ddc0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java @@ -46,10 +46,10 @@ import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; /** @@ -67,10 +67,7 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { .replaceIfs(RelCollationTraitDef.INSTANCE, new Supplier<List<RelCollation>>() { public List<RelCollation> get() { - if (table != null) { - return table.unwrap(PhoenixTable.class).getStatistic().getCollations(); - } - return ImmutableList.of(); + return table.unwrap(PhoenixTable.class).getStatistic().getCollations(); } }); return new PhoenixTableScan(cluster, traits, table, filter); @@ -149,6 +146,19 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { } else if (table.unwrap(PhoenixTable.class).getTable().getParentName() != null){ rowCount = addEpsilon(rowCount); } + if (requireRowKeyOrder()) { + // We don't want to make a big difference here. The idea is to avoid + // forcing row key order whenever the order is absolutely useless. + // E.g. in "select count(*) from t" we do not need the row key order; + // while in "select * from t order by pk0" we should force row key + // order to avoid sorting. + // Another case is "select pk0, count(*) from t", where we'd like to + // choose the row key ordered TableScan rel so that the Aggregate rel + // above it can be an stream aggregate, although at runtime this will + // eventually be an AggregatePlan, in which the "forceRowKeyOrder" + // flag takes no effect. + rowCount = addEpsilon(rowCount); + } int fieldCount = this.table.getRowType().getFieldCount(); return planner.getCostFactory() .makeCost(rowCount * 2 * fieldCount / (fieldCount + 1), rowCount + 1, 0) @@ -202,6 +212,9 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { Integer limit = null; OrderBy orderBy = OrderBy.EMPTY_ORDER_BY; ParallelIteratorFactory iteratorFactory = null; + if (requireRowKeyOrder()) { + ScanUtil.setForceRowKeyOrder(context.getScan()); + } return new ScanPlan(context, select, tableRef, RowProjector.EMPTY_PROJECTOR, limit, orderBy, iteratorFactory, true, dynamicFilter); } catch (SQLException e) { throw new RuntimeException(e); @@ -234,6 +247,10 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { } } } + + private boolean requireRowKeyOrder() { + return table.unwrap(PhoenixTable.class).requireRowKeyOrder; + } private double addEpsilon(double d) { assert d >= 0d; http://git-wip-us.apache.org/repos/asf/phoenix/blob/4060f3bc/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index 7b76a2b..c80d9c5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -84,6 +84,8 @@ public class ScanUtil { Arrays.fill(MAX_FILL_LENGTH_FOR_PREVIOUS_KEY, (byte)-1); } private static final byte[] ZERO_BYTE_ARRAY = new byte[1024]; + + private static final String FORCE_ROW_KEY_ORDER = "_ForceRowKeyOrder"; private ScanUtil() { } @@ -734,9 +736,14 @@ public class ScanUtil { return fetchSize > 1 && !shouldRowsBeInRowKeyOrder(orderBy, context) && orderBy.getOrderByExpressions().isEmpty(); } + public static void setForceRowKeyOrder(Scan scan) { + scan.setAttribute(FORCE_ROW_KEY_ORDER, Bytes.toBytes(Boolean.TRUE.toString())); + } + public static boolean forceRowKeyOrder(StatementContext context) { - return context.getConnection().getQueryServices().getProps() - .getBoolean(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, QueryServicesOptions.DEFAULT_FORCE_ROW_KEY_ORDER); + return context.getScan().getAttribute(FORCE_ROW_KEY_ORDER) != null + && context.getConnection().getQueryServices().getProps() + .getBoolean(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, QueryServicesOptions.DEFAULT_FORCE_ROW_KEY_ORDER); } public static boolean shouldRowsBeInRowKeyOrder(OrderBy orderBy, StatementContext context) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/4060f3bc/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java b/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java index 3734b4c..bd239bd 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java @@ -165,7 +165,7 @@ public class ToExpressionTest extends BaseConnectionlessQueryTest { return null; PTable table = rootTables.get(name); - return new PhoenixTable(pc, table); + return new PhoenixTable(pc, table, true); } @Override
