Repository: phoenix
Updated Branches:
  refs/heads/calcite 20462f03a -> d90870f5d


PHOENIX-3732 Support for dynamic columns in UPSERT in 
Phoenix-Calcite(Rajeshbabu)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d90870f5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d90870f5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d90870f5

Branch: refs/heads/calcite
Commit: d90870f5d64797c4258b9a7fb66967ef26b113d4
Parents: 20462f0
Author: Rajeshbabu Chintaguntla <[email protected]>
Authored: Wed Jun 28 15:22:57 2017 +0530
Committer: Rajeshbabu Chintaguntla <[email protected]>
Committed: Wed Jun 28 15:22:57 2017 +0530

----------------------------------------------------------------------
 .../apache/phoenix/end2end/DynamicUpsertIT.java |   4 +-
 .../phoenix/calcite/PhoenixSqlConformance.java  |   5 +
 .../apache/phoenix/calcite/PhoenixTable.java    | 132 ++++++++++++++++---
 .../apache/phoenix/calcite/TableMapping.java    |  29 +++-
 .../phoenix/calcite/rel/PhoenixTableModify.java |   9 +-
 5 files changed, 151 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d90870f5/phoenix-core/src/it/java/org/apache/phoenix/end2end/DynamicUpsertIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DynamicUpsertIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DynamicUpsertIT.java
index 6f4af72..18513d7 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DynamicUpsertIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DynamicUpsertIT.java
@@ -200,7 +200,7 @@ public class DynamicUpsertIT extends 
ParallelStatsDisabledIT {
     /**
      * Test an upsert of two conflicting dynamic columns
      */
-    @Test(expected = ColumnAlreadyExistsException.class)
+    @Test
     public void testAmbiguousDynamicUpsert() throws Exception {
         String upsertquery = "UPSERT INTO " + tableName + " (a.DynCol 
VARCHAR,a.DynCol INTEGER) VALUES('dynCol',1)";
         String url = getUrl() + ";";
@@ -209,6 +209,8 @@ public class DynamicUpsertIT extends 
ParallelStatsDisabledIT {
         try {
             PreparedStatement statement = conn.prepareStatement(upsertquery);
             statement.execute();
+        } catch (SQLException e) {
+            
assertTrue(e.getCause().getCause().getMessage().contains("Duplicate name"));
         } finally {
             conn.close();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d90870f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSqlConformance.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSqlConformance.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSqlConformance.java
index dee85db..77bd75e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSqlConformance.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSqlConformance.java
@@ -85,4 +85,9 @@ public class PhoenixSqlConformance implements SqlConformance {
     public boolean isHavingAlias() {
         return true;
     }
+
+    @Override
+    public boolean allowExtend() {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d90870f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
index 4175a0e..b40c37c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
@@ -2,6 +2,7 @@ package org.apache.phoenix.calcite;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.calcite.plan.RelOptTable;
@@ -20,7 +21,9 @@ import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.CustomColumnResolvingTable;
+import org.apache.calcite.schema.ExtensibleTable;
 import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.schema.Wrapper;
 import org.apache.calcite.schema.impl.AbstractTable;
@@ -43,19 +46,26 @@ import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.ColumnDef;
 import org.apache.phoenix.parse.NamedTableNode;
 import org.apache.phoenix.parse.TableName;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnImpl;
+import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.stats.StatisticsUtil;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.SchemaUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 /**
@@ -63,7 +73,7 @@ import com.google.common.collect.Lists;
  * Phoenix.
  */
 public class PhoenixTable extends AbstractTable
-    implements TranslatableTable, CustomColumnResolvingTable, Wrapper {
+    implements ExtensibleTable, TranslatableTable, CustomColumnResolvingTable, 
Wrapper {
   public final TableMapping tableMapping;
   public final ImmutableBitSet pkBitSet;
   public final RelCollation collation;
@@ -73,6 +83,20 @@ public class PhoenixTable extends AbstractTable
   public final RelDataTypeFactory typeFactory;
   public final InitializerExpressionFactory initializerExpressionFactory;
 
+    private PhoenixTable(TableMapping tableMapping, ImmutableBitSet pkBitSet,
+            RelCollation collation, long byteCount, long rowCount, 
PhoenixConnection pc,
+            InitializerExpressionFactory initializerExpressionFactory,
+            RelDataTypeFactory typeFactory) {
+    this.tableMapping = tableMapping;
+    this.pkBitSet = pkBitSet;
+    this.collation = collation;
+    this.byteCount = byteCount;
+    this.rowCount = rowCount;
+    this.pc = pc;
+    this.initializerExpressionFactory = initializerExpressionFactory;
+    this.typeFactory = typeFactory;
+  }
+
   public PhoenixTable(PhoenixConnection pc, TableRef tableRef, final 
RelDataTypeFactory typeFactory) throws SQLException {
       this.pc = Preconditions.checkNotNull(pc);
       PTable pTable = tableRef.getTable();
@@ -118,22 +142,31 @@ public class PhoenixTable extends AbstractTable
                   byteCount = dataTableEstimatedCount.getSecond();
               }
           } else {
-              // TODO The props might not be the same as server props.
-              int guidepostPerRegion = pc.getQueryServices().getProps().getInt(
-                      QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
-                      QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION);
-              long guidepostWidth = pc.getQueryServices().getProps().getLong(
-                      QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
-                      
QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
-              HTableDescriptor desc = null;
-              if (guidepostPerRegion > 0) {
-                  desc = pc.getQueryServices().getAdmin().getTableDescriptor(
-                          pTable.getPhysicalName().getBytes());
-              }
-              byteCount = StatisticsUtil.getGuidePostDepth(
+              if(pTable.getType() == PTableType.INDEX) {
+                  rowCount = 1;
+                  byteCount = 1;
+              } else {
+                  // TODO The props might not be the same as server props.
+                  int guidepostPerRegion = 
pc.getQueryServices().getProps().getInt(
+                          QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
+                          
QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION);
+                  long guidepostWidth = 
pc.getQueryServices().getProps().getLong(
+                          QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+                          
QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES);
+                  HTableDescriptor desc = null;
+                  if (guidepostPerRegion > 0) {
+                        desc =
+                                pc.getQueryServices()
+                                        .getAdmin()
+                                        .getTableDescriptor(
+                                            dataTable != null ? 
dataTable.getTable().getPhysicalName()
+                                                    .getBytes() : 
pTable.getPhysicalName().getBytes());
+                  }
+                  byteCount = StatisticsUtil.getGuidePostDepth(
                       guidepostPerRegion, guidepostWidth, desc) / 2;
-              long rowSize = SchemaUtil.estimateRowSize(pTable);
-              rowCount = byteCount / rowSize;
+                  long rowSize = SchemaUtil.estimateRowSize(dataTable != null 
? dataTable.getTable():pTable);
+                  rowCount = byteCount / rowSize;
+              }
           }
       } catch (SQLException | IOException e) {
           throw new RuntimeException(e);
@@ -242,4 +275,71 @@ public class PhoenixTable extends AbstractTable
                 rexBuilder);
         }
     }
+
+    @Override
+    public Table extend(List<RelDataTypeField> fields) {
+        final ImmutableList.Builder<PColumn> extendedColumns = 
ImmutableList.builder();
+        PTable table = tableMapping.getTableRef().getTable();
+        for (RelDataTypeField field : fields) {
+            final RelDataType colType = field.getType();
+            int index = field.getName().indexOf(QueryConstants.NAME_SEPARATOR);
+            String columnFamily = null;
+            String columnName = field.getName();
+            if (index > 0) {
+                columnFamily = field.getName().substring(0, index);
+                try {
+                    table.getColumnFamily(columnFamily);
+                } catch (ColumnFamilyNotFoundException e) {
+                    throw new RuntimeException(e);
+                }
+                columnName = field.getName().substring(index + 1);
+            } else {
+                columnFamily =
+                        
tableMapping.getTableRef().getTable().getDefaultFamilyName() == null ? 
QueryConstants.DEFAULT_COLUMN_FAMILY
+                                : 
tableMapping.getTableRef().getTable().getDefaultFamilyName()
+                                        .getString();
+            }
+            final PColumn column =
+                    new PColumnImpl(PNameFactory.newName(columnName),
+                            PNameFactory.newName(columnFamily), 
PDataType.fromSqlTypeName(field
+                                    .getType().getSqlTypeName().getName()),
+                            colType.getPrecision() == 
RelDataType.PRECISION_NOT_SPECIFIED ? null
+                                    : colType.getPrecision(),
+                            colType.getScale() == 
RelDataType.SCALE_NOT_SPECIFIED ? null : colType
+                                    .getScale(), colType.isNullable(),
+                            (table.getBucketNum() != null ? (field.getIndex() 
+ 1) : field
+                                    .getIndex()),
+                            SortOrder.ASC,// TODO: get this from metastore? 
test if specifying ASC
+                                          // or DESC is allowed. use Docker 
image
+                            colType.isStruct() ? 
field.getType().getFieldCount() : 0, null, false,
+                            null, false, true, null);
+            extendedColumns.add(column);
+        }
+
+        List<PColumn> tableColumns = table.getColumns();
+        if(table.getBucketNum()!=null) {
+            tableColumns = new ArrayList<PColumn>(table.getColumns().size()-1);
+            for(int i = 1; i < table.getColumns().size() ; i++) {
+                tableColumns.add(table.getColumns().get(i));
+            }
+        }
+        final List<PColumn> allColumns =
+                ImmutableList.copyOf(Iterables.concat(tableColumns, 
extendedColumns.build()));
+        try {
+            final PTable extendedTable =
+                    PTableImpl.makePTable(tableMapping.getPTable(), 
allColumns);
+            final TableMapping newMapping =
+                    new TableMapping(extendedTable, 
tableMapping.getExtendedColumnsOffset());
+            return new PhoenixTable(newMapping, pkBitSet, collation, 
byteCount, rowCount, pc,
+                    initializerExpressionFactory, typeFactory);
+        } catch (SQLException e) {
+            throw new RuntimeException("Could not create extended table", e);
+        }
+    }
+
+    @Override
+    public int getExtendedColumnOffset() {
+        return tableMapping.getExtendedColumnsOffset();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d90870f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java
index 966d552..c146230 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java
@@ -67,7 +67,7 @@ public class TableMapping {
     private final TableRef tableRef;
     private final TableRef dataTableRef;
     private final List<PColumn> mappedColumns;
-    private final int extendedColumnsOffset;
+    private int extendedColumnsOffset;
     private final TableRef extendedTableRef;
     // For column resolving
     private final List<String> names = Lists.newArrayList();
@@ -82,6 +82,11 @@ public class TableMapping {
         init();
     }
 
+    public TableMapping(PTable table, int extendedColumnsOffset) {
+        this(table);
+        this.extendedColumnsOffset = extendedColumnsOffset;
+    }
+
     public TableMapping(TableRef tableRef, TableRef dataTableRef, boolean 
extend) throws SQLException {
         this.tableRef = tableRef;
         this.dataTableRef = dataTableRef;
@@ -273,7 +278,7 @@ public class TableMapping {
     
     public Expression newColumnExpression(int index) {
         ColumnRef colRef = new ColumnRef(
-                index < extendedColumnsOffset ? tableRef : extendedTableRef,
+                index < extendedColumnsOffset ? tableRef : (dataTableRef != 
null? extendedTableRef: tableRef),
                 this.mappedColumns.get(index).getPosition());
         try {
             return colRef.newColumnExpression();
@@ -306,10 +311,18 @@ public class TableMapping {
         int columnCount = 0;
         for (int i = extendedColumnsOffset; i < mappedColumns.size(); i++) {
             if (columnRef.get(i)) {
-                PColumn dataColumn = ((ProjectedColumn) mappedColumns.get(i))
-                        .getSourceColumnRef().getColumn();
-                cf.add(dataColumn.getFamilyName().getString());
-                columnCount++;
+                if(dataTableRef != null) {
+                    PColumn dataColumn = ((ProjectedColumn) 
mappedColumns.get(i))
+                            .getSourceColumnRef().getColumn();
+                    cf.add(dataColumn.getFamilyName().getString());
+                    columnCount++;
+                } else {
+                    PColumn column = mappedColumns.get(i);
+                    if(column.getFamilyName()!=null) {
+                        cf.add(column.getFamilyName().getString());
+                    }
+                    columnCount++;
+                }
             }
         }
         return new org.apache.hadoop.hbase.util.Pair<Integer, 
Integer>(cf.size(), columnCount);
@@ -535,4 +548,8 @@ public class TableMapping {
         
         return projectedColumns;
     }
+    
+    public int getExtendedColumnsOffset() {
+        return extendedColumnsOffset;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d90870f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
index 6aeb638..645545d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
@@ -125,14 +125,13 @@ public class PhoenixTableModify extends TableModify 
implements PhoenixRel {
             // TODO TenantId, ViewIndexId, UpdatableViewColumns
             final int[] columnIndexes = new int[targetColumns.size()];
             final int[] pkSlotIndexes = new int[targetColumns.size()];
-            int pkColPosition = -1;
+            int nonPKColumnCount = 0;
             for (int i = 0; i < targetColumns.size(); i++) {
                 PColumn column = targetColumns.get(i);
                 if (SchemaUtil.isPKColumn(column)) {
-                    if (pkColPosition == -1) {
-                        pkColPosition = column.getPosition();
-                    }
-                    pkSlotIndexes[i] = pkColPosition++;
+                    pkSlotIndexes[i] = column.getPosition() - nonPKColumnCount;
+                } else {
+                    nonPKColumnCount++;
                 }
                 columnIndexes[i] = column.getPosition();
             }

Reply via email to