Repository: phoenix
Updated Branches:
  refs/heads/calcite 413247da5 -> 9252f64d6


PHOENIX-2416 Implement multi-tenant tables in Phoenix/Calcite integration


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9252f64d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9252f64d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9252f64d

Branch: refs/heads/calcite
Commit: 9252f64d6dd49ada022cc52954931256f3e109f8
Parents: 413247d
Author: maryannxue <[email protected]>
Authored: Fri Nov 20 20:39:14 2015 -0500
Committer: maryannxue <[email protected]>
Committed: Fri Nov 20 20:39:14 2015 -0500

----------------------------------------------------------------------
 .../org/apache/phoenix/calcite/CalciteIT.java   | 146 +++++++++++--------
 .../apache/phoenix/calcite/PhoenixSchema.java   |  45 +++++-
 .../apache/phoenix/calcite/PhoenixTable.java    |  39 +++--
 .../apache/phoenix/calcite/rel/PhoenixRel.java  |   1 +
 .../calcite/rel/PhoenixRelImplementorImpl.java  |  40 +++--
 .../phoenix/calcite/rel/PhoenixTableScan.java   |  23 +--
 .../compile/TupleProjectionCompiler.java        |   4 +-
 .../apache/phoenix/execute/RuntimeContext.java  |   3 +-
 .../phoenix/execute/RuntimeContextImpl.java     |  17 ++-
 9 files changed, 200 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/9252f64d/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
index 8daf255..6ba0bd6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
@@ -124,7 +124,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
             }
         }
 
-        public Sql explainIs(String expected) {
+        public Sql explainIs(String expected) throws SQLException {
             final List<Object[]> list = getResult("explain plan for " + sql);
             if (list.size() != 1) {
                 fail("explain should return 1 row, got " + list.size());
@@ -135,52 +135,40 @@ public class CalciteIT extends BaseClientManagedTimeIT {
         }
 
 
-        public boolean execute() {
-            try {
-                final Statement statement = 
start.getConnection().createStatement();
-                final boolean execute = statement.execute(sql);
-                statement.close();
-                return execute;
-            } catch (SQLException e) {
-                throw new RuntimeException(e);
-            }
+        public boolean execute() throws SQLException {
+            final Statement statement = 
start.getConnection().createStatement();
+            final boolean execute = statement.execute(sql);
+            statement.close();
+            return execute;
         }
 
-        public List<Object[]> getResult(String sql) {
-            try {
-                final Statement statement = 
start.getConnection().createStatement();
-                final ResultSet resultSet = statement.executeQuery(sql);
-                List<Object[]> list = getResult(resultSet);
-                resultSet.close();
-                statement.close();
-                return list;
-            } catch (SQLException e) {
-                throw new RuntimeException(e);
-            }
+        public List<Object[]> getResult(String sql) throws SQLException {
+            final Statement statement = 
start.getConnection().createStatement();
+            final ResultSet resultSet = statement.executeQuery(sql);
+            List<Object[]> list = getResult(resultSet);
+            resultSet.close();
+            statement.close();
+            return list;
         }
 
         public void close() {
             start.close();
         }
 
-        public Sql resultIs(Object[]... expected) {
-            try {
-                final Statement statement = 
start.getConnection().createStatement();
-                final ResultSet resultSet = statement.executeQuery(sql);
-                for (int i = 0; i < expected.length; i++) {
-                    assertTrue(resultSet.next());
-                    Object[] row = expected[i];
-                    for (int j = 0; j < row.length; j++) {
-                        assertEquals(row[j], resultSet.getObject(j + 1));
-                    }
-                }        
-                assertFalse(resultSet.next());
-                resultSet.close();
-                statement.close();
-                return this;
-            } catch (SQLException e) {
-                throw new RuntimeException(e);
-            }
+        public Sql resultIs(Object[]... expected) throws SQLException {
+            final Statement statement = 
start.getConnection().createStatement();
+            final ResultSet resultSet = statement.executeQuery(sql);
+            for (int i = 0; i < expected.length; i++) {
+                assertTrue(resultSet.next());
+                Object[] row = expected[i];
+                for (int j = 0; j < row.length; j++) {
+                    assertEquals(row[j], resultSet.getObject(j + 1));
+                }
+            }        
+            assertFalse(resultSet.next());
+            resultSet.close();
+            statement.close();
+            return this;
         }
     }
 
@@ -397,6 +385,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
     }
     
     protected static final String MULTI_TENANT_TABLE = 
"multitenant_test_table";
+    protected static final String MULTI_TENANT_VIEW = "multitenant_test_view";
     
     protected void initMultiTenantTables() throws SQLException {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -423,6 +412,13 @@ public class CalciteIT extends BaseClientManagedTimeIT {
             stmt.setInt(4, 6);
             stmt.execute();
             conn.commit();
+            
+            conn.close();
+            props.setProperty("TenantId", "10");
+            conn = DriverManager.getConnection(getUrl(), props);
+            conn.createStatement().execute("CREATE VIEW " + MULTI_TENANT_VIEW
+                    + " AS select * from " + MULTI_TENANT_TABLE);
+            conn.commit();
         } catch (TableAlreadyExistsException e) {
         } finally {
             conn.close();
@@ -665,7 +661,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 .close();
     }
     
-    @Test public void testAggregate() {
+    @Test public void testAggregate() throws Exception {
         start(false).sql("select count(b_string) from atable")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixServerAggregate(group=[{}], 
EXPR$0=[COUNT($3)])\n" +
@@ -788,7 +784,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 .close();
     }
     
-    @Test public void testDistinct() {
+    @Test public void testDistinct() throws Exception {
         start(false).sql("select distinct a_string from aTable")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixServerAggregate(group=[{2}], 
isOrdered=[false])\n" +
@@ -800,7 +796,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 .close();
     }
     
-    @Test public void testSort() {
+    @Test public void testSort() throws Exception {
         start(false).sql("select organization_id, entity_id, a_string from 
aTable order by a_string, entity_id")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixServerSort(sort0=[$2], sort1=[$1], 
dir0=[ASC], dir1=[ASC])\n" +
@@ -930,7 +926,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 .close();
     }
     
-    @Test public void testSortWithLimit() {
+    @Test public void testSortWithLimit() throws Exception {
         start(false).sql("select organization_id, entity_id, a_string from 
aTable order by a_string, entity_id limit 5")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixLimit(fetch=[5])\n" +
@@ -1043,7 +1039,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 .close();
     }
     
-    @Test public void testLimit() {
+    @Test public void testLimit() throws Exception {
         start(false).sql("select organization_id, entity_id, a_string from 
aTable limit 5")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixLimit(fetch=[5])\n" +
@@ -1107,7 +1103,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 .close();
     }
 
-    @Test public void testScalarSubquery() {
+    @Test public void testScalarSubquery() throws Exception {
         start(false).sql("select \"item_id\", name, (select max(quantity) sq 
\n"
             + "from " + JOIN_ORDER_TABLE_FULL_NAME + " o where o.\"item_id\" = 
i.\"item_id\")\n"
             + "from " + JOIN_ITEM_TABLE_FULL_NAME + " i")
@@ -1151,7 +1147,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                .close();;
     }
     
-    @Test public void testIndex() {
+    @Test public void testIndex() throws Exception {
         start(true).sql("select * from aTable where b_string = 'b'")
             .explainIs("PhoenixToEnumerableConverter\n" +
                        "  PhoenixServerProject(ORGANIZATION_ID=[$1], 
ENTITY_ID=[$2], A_STRING=[$3], B_STRING=[$0], A_INTEGER=[$4], A_DATE=[$5], 
A_TIME=[$6], A_TIMESTAMP=[$7], X_DECIMAL=[$8], X_LONG=[$9], X_INTEGER=[$10], 
Y_INTEGER=[$11], A_BYTE=[$12], A_SHORT=[$13], A_FLOAT=[$14], A_DOUBLE=[$15], 
A_UNSIGNED_FLOAT=[$16], A_UNSIGNED_DOUBLE=[$17])\n" +
@@ -1200,7 +1196,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
             .close();
     }
     
-    @Test public void testValues() {
+    @Test public void testValues() throws Exception {
         start(false).sql("select p0+p1 from (values (2, 1)) as t(p0, p1)")
             .explainIs("PhoenixToEnumerableConverter\n" +
                        "  PhoenixClientProject(EXPR$0=[+($0, $1)])\n" +
@@ -1214,7 +1210,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
             .close();
     }
     
-    @Test public void testUnion() {
+    @Test public void testUnion() throws Exception {
         start(false).sql("select entity_id from atable where a_string = 'a' 
union all select entity_id from atable where a_string = 'b'")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixUnion(all=[true])\n" +
@@ -1269,7 +1265,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 .close();
     }
     
-    @Test public void testUnnest() {
+    @Test public void testUnnest() throws Exception {
         start(false).sql("SELECT t.s FROM UNNEST((SELECT scores FROM " + 
SCORES_TABLE_NAME + ")) AS t(s)")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixUncollect\n" +
@@ -1301,7 +1297,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 .close();
     }
     
-    @Test public void testCorrelateAndDecorrelation() {
+    @Test public void testCorrelateAndDecorrelation() throws Exception {
         Properties correlProps = getConnectionProps(false);
         correlProps.setProperty("forceDecorrelate", Boolean.FALSE.toString());
         Properties decorrelProps = getConnectionProps(false);
@@ -1470,7 +1466,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
         
start(decorrelProps).sql(q6).explainIs(p6Decorrelated).resultIs(r6).close();
     }
     
-    @Test public void testInValueList() {
+    @Test public void testInValueList() throws Exception {
         start(false).sql("select entity_id from aTable where organization_id = 
'00D300000000XHP' and entity_id in ('00A123122312312', '00A223122312312', 
'00B523122312312', '00B623122312312', '00C923122312312')")
             .explainIs("PhoenixToEnumerableConverter\n" +
                        "  PhoenixServerProject(ENTITY_ID=[$1])\n" +
@@ -1484,7 +1480,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
             .close();
     }
     
-    @Test public void testSelectFromView() {
+    @Test public void testSelectFromView() throws Exception {
         start(false).sql("select * from v")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixTableScan(table=[[phoenix, ATABLE]], 
filter=[=($2, 'a')])\n")
@@ -1496,7 +1492,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 .close();
     }
     
-    @Test public void testSaltedIndex() {
+    @Test public void testSaltedIndex() throws Exception {
         start(true).sql("select count(*) from " + NOSALT_TABLE_NAME + " where 
col0 > 3")
                 .explainIs("PhoenixToEnumerableConverter\n" +
                            "  PhoenixServerAggregate(group=[{}], 
EXPR$0=[COUNT()])\n" +
@@ -1557,8 +1553,25 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 .close();
     }
     
-    @Test public void testMultiTenant() {
+    @Test public void testMultiTenant() throws Exception {
         Properties props = getConnectionProps(false);
+        start(props).sql("select * from " + MULTI_TENANT_TABLE)
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixTableScan(table=[[phoenix, 
MULTITENANT_TEST_TABLE]])\n")
+                .resultIs(new Object[][] {
+                        {"10", "2", 3, 4},
+                        {"15", "3", 4, 5},
+                        {"20", "4", 5, 6}})
+                .close();
+        
+        try {
+            start(props).sql("select * from " + MULTI_TENANT_VIEW)
+                .explainIs("")
+                .close();
+            fail("Should have got SQLException.");
+        } catch (SQLException e) {
+        }
+        
         props.setProperty("TenantId", "15");
         start(props).sql("select * from " + MULTI_TENANT_TABLE)
                 .explainIs("PhoenixToEnumerableConverter\n" +
@@ -1566,19 +1579,36 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 .resultIs(new Object[][] {
                         {"3", 4, 5}})
                 .close();
+        
+        try {
+            start(props).sql("select * from " + MULTI_TENANT_VIEW)
+                .explainIs("")
+                .close();
+            fail("Should have got SQLException.");
+        } catch (SQLException e) {
+        }
+
+        props.setProperty("TenantId", "10");
+        start(props).sql("select * from " + MULTI_TENANT_VIEW)
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixTableScan(table=[[phoenix, 
MULTITENANT_TEST_TABLE]])\n")
+                .resultIs(new Object[][] {
+                        {"2", 3, 4}})
+                .close();
     }
 
-    /** Tests a simple command that is defined in Phoenix's extended SQL 
parser. */
+    /** Tests a simple command that is defined in Phoenix's extended SQL 
parser. 
+     * @throws Exception */
     @Ignore
-    @Test public void testCommit() {
+    @Test public void testCommit() throws Exception {
         start(false).sql("commit").execute();
     }
 
-    @Test public void testCreateView() {
+    @Test public void testCreateView() throws Exception {
         start(false).sql("create view v as select * from (values (1, 'a'), (2, 
'b')) as t(x, y)").execute();
     }
 
-    @Test public void testConnectJoinHsqldb() {
+    @Test public void testConnectJoinHsqldb() throws Exception {
         final Start start = new Start(getConnectionProps(false)) {
             @Override
             Connection createConnection() throws Exception {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9252f64d/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
index 40afe5f..ef14b45 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
@@ -21,6 +21,7 @@ import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.ViewType;
+import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.IndexUtil;
@@ -81,7 +82,9 @@ public class PhoenixSchema implements Schema {
             Set<String> subSchemaNames = Sets.newHashSet();
             while (rs.next()) {
                 String schemaName = 
rs.getString(PhoenixDatabaseMetaData.TABLE_SCHEM);
-                subSchemaNames.add(schemaName == null ? "" : schemaName);
+                if (schemaName != null) {
+                    subSchemaNames.add(schemaName);
+                }
             }
             return subSchemaNames;
         } catch (SQLException e) {
@@ -106,16 +109,47 @@ public class PhoenixSchema implements Schema {
                                     ImmutableList.<ColumnDef>of()), pc);
                     final List<TableRef> tables = x.getTables();
                     assert tables.size() == 1;
-                    tableMap.put(tableName, tables.get(0).getTable());
+                    PTable pTable = tables.get(0).getTable();
+                    if (pc.getTenantId() == null && pTable.isMultiTenant()) {
+                        pTable = fixTableMultiTenancy(pTable);
+                    }
+                    tableMap.put(tableName, pTable);
                 } else {
-                    String viewSql = 
rs.getString(PhoenixDatabaseMetaData.VIEW_STATEMENT);
-                    viewDefMap.put(tableName, new ViewDef(viewSql, 
viewType.equals(ViewType.UPDATABLE.name())));
+                    boolean isMultiTenant = 
rs.getBoolean(PhoenixDatabaseMetaData.MULTI_TENANT);
+                    if (pc.getTenantId() != null || !isMultiTenant) {
+                        String viewSql = 
rs.getString(PhoenixDatabaseMetaData.VIEW_STATEMENT);
+                        if (viewSql == null) {
+                            String q = "select " + 
PhoenixDatabaseMetaData.COLUMN_FAMILY
+                                    + " from " + 
PhoenixDatabaseMetaData.SYSTEM_CATALOG
+                                    + " where " + 
PhoenixDatabaseMetaData.TABLE_SCHEM
+                                    + (schemaName == null ? " is null" : " = 
'" + schemaName + "'")
+                                    + " and " + 
PhoenixDatabaseMetaData.TABLE_NAME
+                                    + " = '" + tableName + "'"
+                                    + " and " + 
PhoenixDatabaseMetaData.COLUMN_FAMILY + " is not null";
+                            ResultSet rs2 = 
pc.createStatement().executeQuery(q);
+                            if (!rs2.next()) {
+                                throw new SQLException("View link not found 
for " + tableName);
+                            }
+                            String parentTableName = 
rs2.getString(PhoenixDatabaseMetaData.COLUMN_FAMILY);
+                            viewSql = "select * from " + parentTableName;
+                        }
+                        viewDefMap.put(tableName, new ViewDef(viewSql, 
viewType.equals(ViewType.UPDATABLE.name())));
+                    }
                 }
             }
         } catch (SQLException e) {
             throw new RuntimeException(e);
         }
     }
+    
+    private PTable fixTableMultiTenancy(PTable table) throws SQLException {
+        return PTableImpl.makePTable(
+                table.getTenantId(), table.getSchemaName(), 
table.getTableName(), table.getType(), table.getIndexState(), 
table.getTimeStamp(),
+                table.getSequenceNumber(), table.getPKName(), 
table.getBucketNum(), PTableImpl.getColumnsToClone(table), 
table.getParentSchemaName(), table.getParentTableName(),
+                table.getIndexes(), table.isImmutableRows(), 
table.getPhysicalNames(), table.getDefaultFamilyName(), 
table.getViewStatement(),
+                table.isWALDisabled(), false, table.getStoreNulls(), 
table.getViewType(), table.getViewIndexId(), table.getIndexType(),
+                table.getTableStats(), table.getBaseColumnCount(), 
table.rowKeyOrderOptimizable());
+    }
 
     private static Schema create(String name, Map<String, Object> operand) {
         String url = (String) operand.get("url");
@@ -210,8 +244,7 @@ public class PhoenixSchema implements Schema {
             CalciteSchema calciteSchema) {
         StringBuffer sb = new StringBuffer();
         sb.append("SELECT");
-        for (int i = PhoenixTable.getStartingColumnPosition(index); i < 
index.getColumns().size(); i++) {
-            PColumn column = index.getColumns().get(i);
+        for (PColumn column : PhoenixTable.getMappedColumns(index)) {
             String indexColumnName = column.getName().getString();
             String dataColumnName = 
IndexUtil.getDataColumnName(indexColumnName);
             sb.append(",").append("\"").append(dataColumnName).append("\"");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9252f64d/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
index df6e338..22d4e68 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
@@ -37,25 +37,44 @@ import com.google.common.collect.Lists;
  */
 public class PhoenixTable extends AbstractTable implements TranslatableTable {
   public final PTable pTable;
+  public final List<PColumn> mappedColumns;
   public final ImmutableBitSet pkBitSet;
   public final RelCollation collation;
   public final PhoenixConnection pc;
   
-  public static int getStartingColumnPosition(PTable pTable) {
-      return (pTable.getBucketNum() == null ? 0 : 1) + (pTable.isMultiTenant() 
? 1 : 0) + (pTable.getViewIndexId() == null ? 0 : 1);
+  public static List<PColumn> getMappedColumns(PTable pTable) {
+      if (pTable.getBucketNum() == null
+              && !pTable.isMultiTenant()
+              && pTable.getViewIndexId() == null) {
+          return pTable.getColumns();
+      }
+      
+      List<PColumn> columns = Lists.newArrayList(pTable.getColumns());
+      if (pTable.getViewIndexId() != null) {
+          columns.remove((pTable.getBucketNum() == null ? 0 : 1) + 
(pTable.isMultiTenant() ? 1 : 0));
+      }
+      if (pTable.isMultiTenant()) {
+          columns.remove(pTable.getBucketNum() == null ? 0 : 1);
+      }
+      if (pTable.getBucketNum() != null) {
+          columns.remove(0);
+      }
+      return columns;
   }
 
   public PhoenixTable(PhoenixConnection pc, PTable pTable) {
       this.pc = Preconditions.checkNotNull(pc);
       this.pTable = Preconditions.checkNotNull(pTable);
+      this.mappedColumns = getMappedColumns(pTable);
       List<Integer> pkPositions = Lists.<Integer> newArrayList();
       List<RelFieldCollation> fieldCollations = Lists.<RelFieldCollation> 
newArrayList();
-      int start = getStartingColumnPosition(pTable);
-      for (PColumn column : pTable.getPKColumns().subList(start, 
pTable.getPKColumns().size())) {
-          int position = column.getPosition() - start;
-          SortOrder sortOrder = column.getSortOrder();
-          pkPositions.add(position);
-          fieldCollations.add(new RelFieldCollation(position, sortOrder == 
SortOrder.ASC ? Direction.ASCENDING : Direction.DESCENDING));
+      for (int i = 0; i < mappedColumns.size(); i++) {
+          PColumn column = mappedColumns.get(i);
+          if (SchemaUtil.isPKColumn(column)) {
+              SortOrder sortOrder = column.getSortOrder();
+              pkPositions.add(i);
+              fieldCollations.add(new RelFieldCollation(i, sortOrder == 
SortOrder.ASC ? Direction.ASCENDING : Direction.DESCENDING));
+          }
       }
       this.pkBitSet = ImmutableBitSet.of(pkPositions);
       this.collation = 
RelCollationTraitDef.INSTANCE.canonize(RelCollations.of(fieldCollations));
@@ -69,8 +88,8 @@ public class PhoenixTable extends AbstractTable implements 
TranslatableTable {
     @Override
     public RelDataType getRowType(RelDataTypeFactory typeFactory) {
         final RelDataTypeFactory.FieldInfoBuilder builder = 
typeFactory.builder();
-        for (int i = getStartingColumnPosition(pTable); i < 
pTable.getColumns().size(); i++) {
-            PColumn pColumn = pTable.getColumns().get(i);
+        for (int i = 0; i < mappedColumns.size(); i++) {
+            PColumn pColumn = mappedColumns.get(i);
             final PDataType baseType = 
                     pColumn.getDataType().isArrayType() ?
                             
PDataType.fromTypeId(pColumn.getDataType().getSqlType() - 
PDataType.ARRAY_TYPE_BASE) 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9252f64d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
index f5e04be..92d8ad0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
@@ -73,6 +73,7 @@ public interface PhoenixRel extends RelNode {
     ImplementorContext popContext();
     ImplementorContext getCurrentContext();
     PTable createProjectedTable();
+    TupleProjector createTupleProjector();
     RowProjector createRowProjector();
     TupleProjector project(List<Expression> exprs);
   }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9252f64d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
index ad5fde7..cd6f599 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
@@ -31,13 +31,16 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.SchemaUtil;
 
 import com.google.common.collect.Lists;
 
 public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor {
     private final RuntimeContext runtimeContext;
        private TableRef tableRef;
+       private List<PColumn> mappedColumns;
        private Stack<ImplementorContext> contextStack;
        
        public PhoenixRelImplementorImpl(RuntimeContext runtimeContext) {
@@ -52,17 +55,14 @@ public class PhoenixRelImplementorImpl implements 
PhoenixRel.Implementor {
 
        @Override
        public ColumnExpression newColumnExpression(int index) {
-           int pos = index + 
PhoenixTable.getStartingColumnPosition(this.tableRef.getTable());
-               ColumnRef colRef = new ColumnRef(this.tableRef, pos);
+               ColumnRef colRef = new ColumnRef(this.tableRef, 
this.mappedColumns.get(index).getPosition());
                return colRef.newColumnExpression();
        }
     
     @SuppressWarnings("rawtypes")
     @Override
     public Expression newFieldAccessExpression(String variableId, int index, 
PDataType type) {
-        TableRef variableDef = 
runtimeContext.getCorrelateVariableDef(variableId);
-        int pos = index + 
PhoenixTable.getStartingColumnPosition(variableDef.getTable());
-        Expression fieldAccessExpr = new ColumnRef(variableDef, 
pos).newColumnExpression();
+        Expression fieldAccessExpr = 
runtimeContext.newCorrelateVariableReference(variableId, index);
         return new CorrelateVariableFieldAccessExpression(runtimeContext, 
variableId, fieldAccessExpr);
     }
     
@@ -74,6 +74,7 @@ public class PhoenixRelImplementorImpl implements 
PhoenixRel.Implementor {
     @Override
        public void setTableRef(TableRef tableRef) {
                this.tableRef = tableRef;
+               this.mappedColumns = 
PhoenixTable.getMappedColumns(tableRef.getTable());
        }
     
     @Override
@@ -99,9 +100,10 @@ public class PhoenixRelImplementorImpl implements 
PhoenixRel.Implementor {
     @Override
     public PTable createProjectedTable() {
         List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList();
-        int start = getCurrentContext().retainPKColumns ? 0 : 
PhoenixTable.getStartingColumnPosition(getTableRef().getTable());
-        for (int i = start; i < getTableRef().getTable().getColumns().size(); 
i++) {
-            sourceColumnRefs.add(new ColumnRef(getTableRef(), 
getTableRef().getTable().getColumns().get(i).getPosition()));
+        List<PColumn> columns = getCurrentContext().retainPKColumns ?
+                  getTableRef().getTable().getColumns() : mappedColumns;
+        for (PColumn column : columns) {
+            sourceColumnRefs.add(new ColumnRef(getTableRef(), 
column.getPosition()));
         }
         
         try {
@@ -112,12 +114,26 @@ public class PhoenixRelImplementorImpl implements 
PhoenixRel.Implementor {
     }
     
     @Override
+    public TupleProjector createTupleProjector() {
+        KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+        List<Expression> exprs = Lists.<Expression> newArrayList();
+        for (PColumn column : mappedColumns) {
+            if (!SchemaUtil.isPKColumn(column) || 
!getCurrentContext().retainPKColumns) {
+                Expression expr = new ColumnRef(tableRef, 
column.getPosition()).newColumnExpression();
+                exprs.add(expr);
+                builder.addField(expr);                
+            }
+        }
+        
+        return new TupleProjector(builder.build(), exprs.toArray(new 
Expression[exprs.size()]));
+    }
+    
+    @Override
     public RowProjector createRowProjector() {
         List<ColumnProjector> columnProjectors = 
Lists.<ColumnProjector>newArrayList();
-        int start = 
PhoenixTable.getStartingColumnPosition(getTableRef().getTable());
-        for (int i = start; i < getTableRef().getTable().getColumns().size(); 
i++) {
-            PColumn column = getTableRef().getTable().getColumns().get(i);
-            Expression expr = newColumnExpression(i - start); // Do not use 
column.position() here.
+        for (int i = 0; i < mappedColumns.size(); i++) {
+            PColumn column = mappedColumns.get(i);
+            Expression expr = newColumnExpression(i); // Do not use 
column.position() here.
             columnProjectors.add(new 
ExpressionProjector(column.getName().getString(), 
getTableRef().getTable().getName().getString(), expr, false));
         }
         // TODO get estimate row size

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9252f64d/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
index 864b4ca..fcf15f9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
@@ -41,19 +41,14 @@ import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.ColumnRef;
-import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.util.SchemaUtil;
-
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableList.Builder;
-import com.google.common.collect.Lists;
 
 /**
  * Scan of a Phoenix table.
@@ -257,7 +252,7 @@ public class PhoenixTableScan extends TableScan implements 
PhoenixRel {
             }
             projectColumnFamilies(context.getScan(), phoenixTable.getTable(), 
columnRefList);
             if (implementor.getCurrentContext().forceProject) {
-                TupleProjector tupleProjector = 
createTupleProjector(implementor);
+                TupleProjector tupleProjector = 
implementor.createTupleProjector();
                 TupleProjector.serializeProjectorIntoScan(context.getScan(), 
tupleProjector);
                 PTable projectedTable = implementor.createProjectedTable();
                 implementor.setTableRef(new TableRef(projectedTable));
@@ -275,22 +270,6 @@ public class PhoenixTableScan extends TableScan implements 
PhoenixRel {
         }
     }
     
-    private TupleProjector createTupleProjector(Implementor implementor) {
-        KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
-        List<Expression> exprs = Lists.<Expression> newArrayList();
-        TableRef tableRef = implementor.getTableRef();
-        for (int i = 
PhoenixTable.getStartingColumnPosition(tableRef.getTable()); i < 
tableRef.getTable().getColumns().size(); i++) {
-            PColumn column = tableRef.getTable().getColumns().get(i);
-            if (!SchemaUtil.isPKColumn(column) || 
!implementor.getCurrentContext().retainPKColumns) {
-                Expression expr = new ColumnRef(tableRef, 
column.getPosition()).newColumnExpression();
-                exprs.add(expr);
-                builder.addField(expr);                
-            }
-        }
-        
-        return new TupleProjector(builder.build(), exprs.toArray(new 
Expression[exprs.size()]));
-    }
-    
     private void projectColumnFamilies(Scan scan, PTable table, 
ImmutableIntList columnRefList) {
         scan.getFamilyMap().clear();
         for (Integer index : columnRefList) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9252f64d/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
index c6aa546..ed0b335 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
@@ -178,8 +178,8 @@ public class TupleProjectionCompiler {
                     null, table.getTimeStamp(), table.getSequenceNumber(), 
table.getPKName(),
                     retainPKColumns ? table.getBucketNum() : null, 
projectedColumns, null,
                     null, Collections.<PTable>emptyList(), 
table.isImmutableRows(), Collections.<PName>emptyList(), null, null,
-                    table.isWALDisabled(), table.isMultiTenant(), 
table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
-                    null, table.rowKeyOrderOptimizable());
+                    table.isWALDisabled(), retainPKColumns ? 
table.isMultiTenant() : false, table.getStoreNulls(), table.getViewType(),
+                    retainPKColumns ? table.getViewIndexId() : null, null, 
table.rowKeyOrderOptimizable());
     }
 
     // For extracting column references from single select statement

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9252f64d/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java
index 89dd082..99f409e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.execute;
 
+import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 
@@ -24,7 +25,7 @@ public interface RuntimeContext {
 
     public abstract void defineCorrelateVariable(String variableId, TableRef 
def);
 
-    public abstract TableRef getCorrelateVariableDef(String variableId);
+    public abstract Expression newCorrelateVariableReference(String 
variableId, int index);
 
     public abstract void setCorrelateVariableValue(String variableId, Tuple 
value);
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9252f64d/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java
index 6a1ba4a..0accea6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java
@@ -17,8 +17,13 @@
  */
 package org.apache.phoenix.execute;
 
+import java.util.List;
 import java.util.Map;
 
+import org.apache.phoenix.calcite.PhoenixTable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 
@@ -37,12 +42,12 @@ public class RuntimeContextImpl implements RuntimeContext {
     }
     
     @Override
-    public TableRef getCorrelateVariableDef(String variableId) {
+    public Expression newCorrelateVariableReference(String variableId, int 
index) {
         VariableEntry entry = this.correlateVariables.get(variableId);
         if (entry == null)
             throw new RuntimeException("Variable '" + variableId + "' 
undefined.");
         
-        return entry.getDef();
+        return new ColumnRef(entry.def, 
entry.mappedColumns.get(index).getPosition()).newColumnExpression();
     }
     
     @Override
@@ -65,15 +70,13 @@ public class RuntimeContextImpl implements RuntimeContext {
     
     private static class VariableEntry {
         private final TableRef def;
+        private final List<PColumn> mappedColumns;
         private Tuple value;
         
         VariableEntry(TableRef def) {
             this.def = def;
-        }
-        
-        TableRef getDef() {
-            return def;
-        }
+            this.mappedColumns = PhoenixTable.getMappedColumns(def.getTable());
+        }        
         
         Tuple getValue() {
             return value;

Reply via email to