Repository: phoenix
Updated Branches:
  refs/heads/calcite f00a3c981 -> 4060f3bcd


PHOENIX-2262 Improve collation for salted table


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4060f3bc
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4060f3bc
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4060f3bc

Branch: refs/heads/calcite
Commit: 4060f3bcd4300ce79aba2ef63672c8593ec35ea5
Parents: f00a3c9
Author: maryannxue <[email protected]>
Authored: Wed Nov 4 21:17:58 2015 -0500
Committer: maryannxue <[email protected]>
Committed: Wed Nov 4 21:17:58 2015 -0500

----------------------------------------------------------------------
 .../org/apache/phoenix/calcite/CalciteIT.java   | 23 ++++++-----
 .../apache/phoenix/calcite/PhoenixSchema.java   | 42 +++++++++++++++++---
 .../apache/phoenix/calcite/PhoenixTable.java    | 22 +++++-----
 .../phoenix/calcite/rel/PhoenixTableScan.java   | 27 ++++++++++---
 .../java/org/apache/phoenix/util/ScanUtil.java  | 11 ++++-
 .../phoenix/calcite/ToExpressionTest.java       |  2 +-
 6 files changed, 95 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/4060f3bc/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
index 6623b37..b1720cf 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
@@ -762,7 +762,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
         start(false).sql("select mypk0, avg(mypk1) from " + SALTED_TABLE_NAME 
+ " group by mypk0")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixClientProject(MYPK0=[$0], 
EXPR$1=[CAST(/($1, $2)):INTEGER NOT NULL])\n" +
-                           "    PhoenixServerAggregate(group=[{0}], 
agg#0=[$SUM0($1)], agg#1=[COUNT()], isOrdered=[false])\n" +
+                           "    PhoenixServerAggregate(group=[{0}], 
agg#0=[$SUM0($1)], agg#1=[COUNT()], isOrdered=[true])\n" +
                            "      PhoenixTableScan(table=[[phoenix, 
SALTED_TEST_TABLE]])\n")
                 .resultIs(new Object[][] {
                         {1, 2},
@@ -1465,14 +1465,15 @@ public class CalciteIT extends BaseClientManagedTimeIT {
         start(true).sql("select count(*) from " + NOSALT_TABLE_NAME + " where 
col0 > 3")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixServerAggregate(group=[{}], 
EXPR$0=[COUNT()])\n" +
-                           "    PhoenixTableScan(table=[[phoenix, 
IDXSALTED_NOSALT_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 3)])\n")
+                           "    PhoenixServerProject(DUMMY=[0])\n" +
+                           "      PhoenixTableScan(table=[[phoenix, 
IDXSALTED_NOSALT_TEST_TABLE:unordered]], filter=[>(CAST($0):INTEGER, 3)])\n")
                 .resultIs(new Object[][]{{2L}})
                 .close();
         start(true).sql("select mypk0, mypk1, col0 from " + NOSALT_TABLE_NAME 
+ " where col0 <= 4")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixToClientConverter\n" +
                            "    PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], 
COL0=[CAST($0):INTEGER])\n" +
-                           "      PhoenixTableScan(table=[[phoenix, 
IDXSALTED_NOSALT_TEST_TABLE]], filter=[<=(CAST($0):INTEGER, 4)])\n")
+                           "      PhoenixTableScan(table=[[phoenix, 
IDXSALTED_NOSALT_TEST_TABLE:unordered]], filter=[<=(CAST($0):INTEGER, 4)])\n")
                 .resultIs(new Object[][] {
                         {2, 3, 4},
                         {1, 2, 3}})
@@ -1480,7 +1481,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
         start(true).sql("select * from " + SALTED_TABLE_NAME + " where mypk0 < 
3")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixToClientConverter\n" +
-                           "    PhoenixTableScan(table=[[phoenix, 
SALTED_TEST_TABLE]], filter=[<($0, 3)])\n")
+                           "    PhoenixTableScan(table=[[phoenix, 
SALTED_TEST_TABLE:unordered]], filter=[<($0, 3)])\n")
                 .resultIs(new Object[][] {
                         {1, 2, 3, 4},
                         {2, 3, 4, 5}})
@@ -1488,14 +1489,15 @@ public class CalciteIT extends BaseClientManagedTimeIT {
         start(true).sql("select count(*) from " + SALTED_TABLE_NAME + " where 
col0 > 3")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixServerAggregate(group=[{}], 
EXPR$0=[COUNT()])\n" +
-                           "    PhoenixTableScan(table=[[phoenix, 
IDX_SALTED_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 3)])\n")
+                           "    PhoenixServerProject(DUMMY=[0])\n" +
+                           "      PhoenixTableScan(table=[[phoenix, 
IDX_SALTED_TEST_TABLE:unordered]], filter=[>(CAST($0):INTEGER, 3)])\n")
                 .resultIs(new Object[][]{{2L}})
                 .close();
         start(true).sql("select mypk0, mypk1, col0 from " + SALTED_TABLE_NAME 
+ " where col0 <= 4")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixToClientConverter\n" +
                            "    PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], 
COL0=[CAST($0):INTEGER])\n" +
-                           "      PhoenixTableScan(table=[[phoenix, 
IDX_SALTED_TEST_TABLE]], filter=[<=(CAST($0):INTEGER, 4)])\n")
+                           "      PhoenixTableScan(table=[[phoenix, 
IDX_SALTED_TEST_TABLE:unordered]], filter=[<=(CAST($0):INTEGER, 4)])\n")
                 .resultIs(new Object[][] {
                         {2, 3, 4},
                         {1, 2, 3}})
@@ -1503,10 +1505,11 @@ public class CalciteIT extends BaseClientManagedTimeIT {
         start(true).sql("select count(*) from " + SALTED_TABLE_NAME + " where 
col1 > 4")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixServerAggregate(group=[{}], 
EXPR$0=[COUNT()])\n" +
-                           "    PhoenixTableScan(table=[[phoenix, 
IDXSALTED_SALTED_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 4)])\n")
+                           "    PhoenixServerProject(DUMMY=[0])\n" +
+                           "      PhoenixTableScan(table=[[phoenix, 
IDXSALTED_SALTED_TEST_TABLE:unordered]], filter=[>(CAST($0):INTEGER, 4)])\n")
                 .resultIs(new Object[][]{{2L}})
                 .close();
-        start(true).sql("select * from " + SALTED_TABLE_NAME + " where col1 <= 
5")
+        start(true).sql("select * from " + SALTED_TABLE_NAME + " where col1 <= 
5 order by col1")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixToClientConverter\n" +
                            "    PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], 
COL0=[$3], COL1=[CAST($0):INTEGER])\n" +
@@ -1519,10 +1522,10 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixToClientConverter\n" +
                            "    PhoenixServerJoin(condition=[AND(=($0, $4), 
=($1, $5))], joinType=[inner])\n" +
-                           "      PhoenixTableScan(table=[[phoenix, 
SALTED_TEST_TABLE]], filter=[>($0, 1)])\n" +
+                           "      PhoenixTableScan(table=[[phoenix, 
SALTED_TEST_TABLE:unordered]], filter=[>($0, 1)])\n" +
                            "      PhoenixToClientConverter\n" +
                            "        PhoenixServerProject(MYPK0=[$1], 
MYPK1=[$2], COL0=[$3], COL1=[CAST($0):INTEGER])\n" +
-                           "          PhoenixTableScan(table=[[phoenix, 
IDXSALTED_SALTED_TEST_TABLE]], filter=[<(CAST($0):INTEGER, 6)])\n")
+                           "          PhoenixTableScan(table=[[phoenix, 
IDXSALTED_SALTED_TEST_TABLE:unordered]], filter=[<(CAST($0):INTEGER, 6)])\n")
                 .resultIs(new Object[][] {
                         {2, 3, 4, 5, 2, 3, 4, 5}})
                 .close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4060f3bc/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
index b0afbc7..bc2d424 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
@@ -20,6 +20,7 @@ import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.ViewType;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
@@ -42,6 +43,7 @@ import java.util.Set;
  */
 public class PhoenixSchema implements Schema {
     public static final Factory FACTORY = new Factory();
+    private static final String UNORDERED_SUFFIX = ":unordered";
     
     protected final String name;
     protected final String schemaName;
@@ -99,7 +101,12 @@ public class PhoenixSchema implements Schema {
                                     ImmutableList.<ColumnDef>of()), pc);
                     final List<TableRef> tables = x.getTables();
                     assert tables.size() == 1;
-                    tableMap.put(tableName, tables.get(0).getTable());
+                    final PTable pTable = tables.get(0).getTable();
+                    tableMap.put(tableName, pTable);
+                    if (pTable.getBucketNum() != null || pTable.getIndexType() 
== IndexType.LOCAL) {
+                        final String unorderedTableName = tableName + 
UNORDERED_SUFFIX;
+                        tableMap.put(unorderedTableName, pTable);
+                    }
                 } else {
                     String viewSql = 
rs.getString(PhoenixDatabaseMetaData.VIEW_STATEMENT);
                     String viewType = 
rs.getString(PhoenixDatabaseMetaData.VIEW_TYPE);
@@ -134,7 +141,7 @@ public class PhoenixSchema implements Schema {
     @Override
     public Table getTable(String name) {
         PTable table = tableMap.get(name);
-        return table == null ? null : new PhoenixTable(pc, table);
+        return table == null ? null : new PhoenixTable(pc, table, 
!isUnorderedTableName(name));
     }
 
     @Override
@@ -183,9 +190,20 @@ public class PhoenixSchema implements Schema {
     
     public void defineIndexesAsMaterializations() {
         List<String> path = calciteSchema.path(null);
-        for (PTable table : tableMap.values()) {
-            for (PTable index : table.getIndexes()) {
-                addMaterialization(table, index, path);
+        for (Map.Entry<String, PTable> entry : tableMap.entrySet()) {
+            final String tableName = entry.getKey();
+            final PTable table = entry.getValue();
+            if (!isUnorderedTableName(tableName)) {
+                for (PTable index : table.getIndexes()) {
+                    addMaterialization(table, index, path);
+                }
+            }
+        }
+        for (Map.Entry<String, PTable> entry : tableMap.entrySet()) {
+            final String tableName = entry.getKey();
+            final PTable table = entry.getValue();
+            if (isUnorderedTableName(tableName)) {
+                addUnorderedAsMaterialization(tableName, table, path);
             }
         }
     }
@@ -205,6 +223,20 @@ public class PhoenixSchema implements Schema {
         MaterializationService.instance().defineMaterialization(
                 calciteSchema, null, sb.toString(), path, 
index.getTableName().getString(), true, true);        
     }
+    
+    protected void addUnorderedAsMaterialization(String tableName, PTable 
table, List<String> path) {
+        StringBuffer sb = new StringBuffer();
+        sb.append("SELECT * FROM ")
+            .append("\"")
+            .append(table.getTableName().getString())
+            .append("\"");
+        MaterializationService.instance().defineMaterialization(
+                calciteSchema, null, sb.toString(), path, tableName, true, 
true);        
+    }
+    
+    private boolean isUnorderedTableName(String tableName) {
+        return tableName.endsWith(UNORDERED_SUFFIX);
+    }
 
     /** Schema factory that creates a
      * {@link org.apache.phoenix.calcite.PhoenixSchema}.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4060f3bc/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
index 4be7450..272cd47 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
@@ -42,21 +42,25 @@ public class PhoenixTable extends AbstractTable implements 
TranslatableTable {
   public final ImmutableBitSet pkBitSet;
   public final RelCollation collation;
   public final PhoenixConnection pc;
+  public final boolean requireRowKeyOrder;
   
   public static int getStartingColumnPosition(PTable pTable) {
       return (pTable.getBucketNum() == null ? 0 : 1) + (pTable.isMultiTenant() 
? 1 : 0) + (pTable.getViewIndexId() == null ? 0 : 1);
   }
 
-  public PhoenixTable(PhoenixConnection pc, PTable pTable) {
+  public PhoenixTable(PhoenixConnection pc, PTable pTable, boolean 
requireRowKeyOrder) {
       this.pc = Preconditions.checkNotNull(pc);
       this.pTable = Preconditions.checkNotNull(pTable);
+      this.requireRowKeyOrder = requireRowKeyOrder;
       List<Integer> pkPositions = Lists.<Integer> newArrayList();
       List<RelFieldCollation> fieldCollations = Lists.<RelFieldCollation> 
newArrayList();
-      for (PColumn column : pTable.getPKColumns()) {
-          int position = column.getPosition();
-          SortOrder sortOrder = column.getSortOrder();
-          pkPositions.add(position);
-          fieldCollations.add(new RelFieldCollation(position, sortOrder == 
SortOrder.ASC ? Direction.ASCENDING : Direction.DESCENDING));
+      if (requireRowKeyOrder) {
+          for (PColumn column : pTable.getPKColumns()) {
+              int position = column.getPosition();
+              SortOrder sortOrder = column.getSortOrder();
+              pkPositions.add(position);
+              fieldCollations.add(new RelFieldCollation(position, sortOrder == 
SortOrder.ASC ? Direction.ASCENDING : Direction.DESCENDING));
+          }
       }
       this.pkBitSet = ImmutableBitSet.of(pkPositions);
       this.collation = 
RelCollationTraitDef.INSTANCE.canonize(RelCollations.of(fieldCollations));
@@ -128,9 +132,9 @@ public class PhoenixTable extends AbstractTable implements 
TranslatableTable {
 
             @Override
             public List<RelCollation> getCollations() {
-                return pTable.getBucketNum() == null ? 
-                        ImmutableList.<RelCollation> of(collation)
-                      : ImmutableList.<RelCollation>of();
+                return collation.getFieldCollations().isEmpty() ? 
+                        ImmutableList.<RelCollation>of()
+                      : ImmutableList.<RelCollation>of(collation);
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4060f3bc/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
index bef650d..567ddc0 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
@@ -46,10 +46,10 @@ import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
 import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 /**
@@ -67,10 +67,7 @@ public class PhoenixTableScan extends TableScan implements 
PhoenixRel {
                 .replaceIfs(RelCollationTraitDef.INSTANCE,
                         new Supplier<List<RelCollation>>() {
                     public List<RelCollation> get() {
-                        if (table != null) {
-                            return 
table.unwrap(PhoenixTable.class).getStatistic().getCollations();
-                        }
-                        return ImmutableList.of();
+                        return 
table.unwrap(PhoenixTable.class).getStatistic().getCollations();
                     }
                 });
         return new PhoenixTableScan(cluster, traits, table, filter);
@@ -149,6 +146,19 @@ public class PhoenixTableScan extends TableScan implements 
PhoenixRel {
         } else if (table.unwrap(PhoenixTable.class).getTable().getParentName() 
!= null){
             rowCount = addEpsilon(rowCount);
         }
+        if (requireRowKeyOrder()) {
+            // We don't want to make a big difference here. The idea is to 
avoid
+            // forcing row key order whenever the order is absolutely useless.
+            // E.g. in "select count(*) from t" we do not need the row key 
order;
+            // while in "select * from t order by pk0" we should force row key
+            // order to avoid sorting.
+            // Another case is "select pk0, count(*) from t", where we'd like 
to
+            // choose the row key ordered TableScan rel so that the Aggregate 
rel
+            // above it can be an stream aggregate, although at runtime this 
will
+            // eventually be an AggregatePlan, in which the "forceRowKeyOrder"
+            // flag takes no effect.
+            rowCount = addEpsilon(rowCount);
+        }
         int fieldCount = this.table.getRowType().getFieldCount();
         return planner.getCostFactory()
                 .makeCost(rowCount * 2 * fieldCount / (fieldCount + 1), 
rowCount + 1, 0)
@@ -202,6 +212,9 @@ public class PhoenixTableScan extends TableScan implements 
PhoenixRel {
             Integer limit = null;
             OrderBy orderBy = OrderBy.EMPTY_ORDER_BY;
             ParallelIteratorFactory iteratorFactory = null;
+            if (requireRowKeyOrder()) {
+                ScanUtil.setForceRowKeyOrder(context.getScan());
+            }
             return new ScanPlan(context, select, tableRef, 
RowProjector.EMPTY_PROJECTOR, limit, orderBy, iteratorFactory, true, 
dynamicFilter);
         } catch (SQLException e) {
             throw new RuntimeException(e);
@@ -234,6 +247,10 @@ public class PhoenixTableScan extends TableScan implements 
PhoenixRel {
             }
         }
     }
+    
+    private boolean requireRowKeyOrder() {
+        return table.unwrap(PhoenixTable.class).requireRowKeyOrder;
+    }
 
     private double addEpsilon(double d) {
       assert d >= 0d;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4060f3bc/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 7b76a2b..c80d9c5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -84,6 +84,8 @@ public class ScanUtil {
         Arrays.fill(MAX_FILL_LENGTH_FOR_PREVIOUS_KEY, (byte)-1);
     }
     private static final byte[] ZERO_BYTE_ARRAY = new byte[1024];
+    
+    private static final String FORCE_ROW_KEY_ORDER = "_ForceRowKeyOrder";
 
     private ScanUtil() {
     }
@@ -734,9 +736,14 @@ public class ScanUtil {
         return fetchSize > 1 && !shouldRowsBeInRowKeyOrder(orderBy, context) 
&& orderBy.getOrderByExpressions().isEmpty();
     }
     
+    public static void setForceRowKeyOrder(Scan scan) {
+        scan.setAttribute(FORCE_ROW_KEY_ORDER, 
Bytes.toBytes(Boolean.TRUE.toString()));
+    }
+    
     public static boolean forceRowKeyOrder(StatementContext context) {
-        return context.getConnection().getQueryServices().getProps()
-                .getBoolean(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, 
QueryServicesOptions.DEFAULT_FORCE_ROW_KEY_ORDER);
+        return context.getScan().getAttribute(FORCE_ROW_KEY_ORDER) != null
+                && context.getConnection().getQueryServices().getProps()
+                    .getBoolean(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, 
QueryServicesOptions.DEFAULT_FORCE_ROW_KEY_ORDER);
     }
     
     public static boolean shouldRowsBeInRowKeyOrder(OrderBy orderBy, 
StatementContext context) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4060f3bc/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java
index 3734b4c..bd239bd 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/calcite/ToExpressionTest.java
@@ -165,7 +165,7 @@ public class ToExpressionTest extends 
BaseConnectionlessQueryTest {
                 return null;
             
             PTable table = rootTables.get(name);
-            return new PhoenixTable(pc, table);
+            return new PhoenixTable(pc, table, true);
         }
 
         @Override

Reply via email to