Repository: phoenix Updated Branches: refs/heads/calcite 20462f03a -> d90870f5d
PHOENIX-3732 Support for dynamic columns in UPSERT in Phoenix-Calcite(Rajeshbabu) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d90870f5 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d90870f5 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d90870f5 Branch: refs/heads/calcite Commit: d90870f5d64797c4258b9a7fb66967ef26b113d4 Parents: 20462f0 Author: Rajeshbabu Chintaguntla <[email protected]> Authored: Wed Jun 28 15:22:57 2017 +0530 Committer: Rajeshbabu Chintaguntla <[email protected]> Committed: Wed Jun 28 15:22:57 2017 +0530 ---------------------------------------------------------------------- .../apache/phoenix/end2end/DynamicUpsertIT.java | 4 +- .../phoenix/calcite/PhoenixSqlConformance.java | 5 + .../apache/phoenix/calcite/PhoenixTable.java | 132 ++++++++++++++++--- .../apache/phoenix/calcite/TableMapping.java | 29 +++- .../phoenix/calcite/rel/PhoenixTableModify.java | 9 +- 5 files changed, 151 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d90870f5/phoenix-core/src/it/java/org/apache/phoenix/end2end/DynamicUpsertIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DynamicUpsertIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DynamicUpsertIT.java index 6f4af72..18513d7 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DynamicUpsertIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DynamicUpsertIT.java @@ -200,7 +200,7 @@ public class DynamicUpsertIT extends ParallelStatsDisabledIT { /** * Test an upsert of two conflicting dynamic columns */ - @Test(expected = ColumnAlreadyExistsException.class) + @Test public void testAmbiguousDynamicUpsert() throws Exception { String upsertquery = "UPSERT INTO " + tableName + " (a.DynCol VARCHAR,a.DynCol INTEGER) VALUES('dynCol',1)"; String url = getUrl() + ";"; @@ -209,6 +209,8 @@ public class DynamicUpsertIT extends ParallelStatsDisabledIT { try { PreparedStatement statement = conn.prepareStatement(upsertquery); statement.execute(); + } catch (SQLException e) { + assertTrue(e.getCause().getCause().getMessage().contains("Duplicate name")); } finally { conn.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d90870f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSqlConformance.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSqlConformance.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSqlConformance.java index dee85db..77bd75e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSqlConformance.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSqlConformance.java @@ -85,4 +85,9 @@ public class PhoenixSqlConformance implements SqlConformance { public boolean isHavingAlias() { return true; } + + @Override + public boolean allowExtend() { + return true; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d90870f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java index 4175a0e..b40c37c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java @@ -2,6 +2,7 @@ package org.apache.phoenix.calcite; import java.io.IOException; import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; import org.apache.calcite.plan.RelOptTable; @@ -20,7 +21,9 @@ import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexNode; import org.apache.calcite.schema.CustomColumnResolvingTable; +import org.apache.calcite.schema.ExtensibleTable; import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Table; import org.apache.calcite.schema.TranslatableTable; import org.apache.calcite.schema.Wrapper; import org.apache.calcite.schema.impl.AbstractTable; @@ -43,19 +46,26 @@ import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.ColumnDef; import org.apache.phoenix.parse.NamedTableNode; import org.apache.phoenix.parse.TableName; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnImpl; +import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.stats.StatisticsUtil; +import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.SchemaUtil; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; /** @@ -63,7 +73,7 @@ import com.google.common.collect.Lists; * Phoenix. */ public class PhoenixTable extends AbstractTable - implements TranslatableTable, CustomColumnResolvingTable, Wrapper { + implements ExtensibleTable, TranslatableTable, CustomColumnResolvingTable, Wrapper { public final TableMapping tableMapping; public final ImmutableBitSet pkBitSet; public final RelCollation collation; @@ -73,6 +83,20 @@ public class PhoenixTable extends AbstractTable public final RelDataTypeFactory typeFactory; public final InitializerExpressionFactory initializerExpressionFactory; + private PhoenixTable(TableMapping tableMapping, ImmutableBitSet pkBitSet, + RelCollation collation, long byteCount, long rowCount, PhoenixConnection pc, + InitializerExpressionFactory initializerExpressionFactory, + RelDataTypeFactory typeFactory) { + this.tableMapping = tableMapping; + this.pkBitSet = pkBitSet; + this.collation = collation; + this.byteCount = byteCount; + this.rowCount = rowCount; + this.pc = pc; + this.initializerExpressionFactory = initializerExpressionFactory; + this.typeFactory = typeFactory; + } + public PhoenixTable(PhoenixConnection pc, TableRef tableRef, final RelDataTypeFactory typeFactory) throws SQLException { this.pc = Preconditions.checkNotNull(pc); PTable pTable = tableRef.getTable(); @@ -118,22 +142,31 @@ public class PhoenixTable extends AbstractTable byteCount = dataTableEstimatedCount.getSecond(); } } else { - // TODO The props might not be the same as server props. - int guidepostPerRegion = pc.getQueryServices().getProps().getInt( - QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, - QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION); - long guidepostWidth = pc.getQueryServices().getProps().getLong( - QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, - QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES); - HTableDescriptor desc = null; - if (guidepostPerRegion > 0) { - desc = pc.getQueryServices().getAdmin().getTableDescriptor( - pTable.getPhysicalName().getBytes()); - } - byteCount = StatisticsUtil.getGuidePostDepth( + if(pTable.getType() == PTableType.INDEX) { + rowCount = 1; + byteCount = 1; + } else { + // TODO The props might not be the same as server props. + int guidepostPerRegion = pc.getQueryServices().getProps().getInt( + QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION); + long guidepostWidth = pc.getQueryServices().getProps().getLong( + QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES); + HTableDescriptor desc = null; + if (guidepostPerRegion > 0) { + desc = + pc.getQueryServices() + .getAdmin() + .getTableDescriptor( + dataTable != null ? dataTable.getTable().getPhysicalName() + .getBytes() : pTable.getPhysicalName().getBytes()); + } + byteCount = StatisticsUtil.getGuidePostDepth( guidepostPerRegion, guidepostWidth, desc) / 2; - long rowSize = SchemaUtil.estimateRowSize(pTable); - rowCount = byteCount / rowSize; + long rowSize = SchemaUtil.estimateRowSize(dataTable != null ? dataTable.getTable():pTable); + rowCount = byteCount / rowSize; + } } } catch (SQLException | IOException e) { throw new RuntimeException(e); @@ -242,4 +275,71 @@ public class PhoenixTable extends AbstractTable rexBuilder); } } + + @Override + public Table extend(List<RelDataTypeField> fields) { + final ImmutableList.Builder<PColumn> extendedColumns = ImmutableList.builder(); + PTable table = tableMapping.getTableRef().getTable(); + for (RelDataTypeField field : fields) { + final RelDataType colType = field.getType(); + int index = field.getName().indexOf(QueryConstants.NAME_SEPARATOR); + String columnFamily = null; + String columnName = field.getName(); + if (index > 0) { + columnFamily = field.getName().substring(0, index); + try { + table.getColumnFamily(columnFamily); + } catch (ColumnFamilyNotFoundException e) { + throw new RuntimeException(e); + } + columnName = field.getName().substring(index + 1); + } else { + columnFamily = + tableMapping.getTableRef().getTable().getDefaultFamilyName() == null ? QueryConstants.DEFAULT_COLUMN_FAMILY + : tableMapping.getTableRef().getTable().getDefaultFamilyName() + .getString(); + } + final PColumn column = + new PColumnImpl(PNameFactory.newName(columnName), + PNameFactory.newName(columnFamily), PDataType.fromSqlTypeName(field + .getType().getSqlTypeName().getName()), + colType.getPrecision() == RelDataType.PRECISION_NOT_SPECIFIED ? null + : colType.getPrecision(), + colType.getScale() == RelDataType.SCALE_NOT_SPECIFIED ? null : colType + .getScale(), colType.isNullable(), + (table.getBucketNum() != null ? (field.getIndex() + 1) : field + .getIndex()), + SortOrder.ASC,// TODO: get this from metastore? test if specifying ASC + // or DESC is allowed. use Docker image + colType.isStruct() ? field.getType().getFieldCount() : 0, null, false, + null, false, true, null); + extendedColumns.add(column); + } + + List<PColumn> tableColumns = table.getColumns(); + if(table.getBucketNum()!=null) { + tableColumns = new ArrayList<PColumn>(table.getColumns().size()-1); + for(int i = 1; i < table.getColumns().size() ; i++) { + tableColumns.add(table.getColumns().get(i)); + } + } + final List<PColumn> allColumns = + ImmutableList.copyOf(Iterables.concat(tableColumns, extendedColumns.build())); + try { + final PTable extendedTable = + PTableImpl.makePTable(tableMapping.getPTable(), allColumns); + final TableMapping newMapping = + new TableMapping(extendedTable, tableMapping.getExtendedColumnsOffset()); + return new PhoenixTable(newMapping, pkBitSet, collation, byteCount, rowCount, pc, + initializerExpressionFactory, typeFactory); + } catch (SQLException e) { + throw new RuntimeException("Could not create extended table", e); + } + } + + @Override + public int getExtendedColumnOffset() { + return tableMapping.getExtendedColumnsOffset(); + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d90870f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java index 966d552..c146230 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java @@ -67,7 +67,7 @@ public class TableMapping { private final TableRef tableRef; private final TableRef dataTableRef; private final List<PColumn> mappedColumns; - private final int extendedColumnsOffset; + private int extendedColumnsOffset; private final TableRef extendedTableRef; // For column resolving private final List<String> names = Lists.newArrayList(); @@ -82,6 +82,11 @@ public class TableMapping { init(); } + public TableMapping(PTable table, int extendedColumnsOffset) { + this(table); + this.extendedColumnsOffset = extendedColumnsOffset; + } + public TableMapping(TableRef tableRef, TableRef dataTableRef, boolean extend) throws SQLException { this.tableRef = tableRef; this.dataTableRef = dataTableRef; @@ -273,7 +278,7 @@ public class TableMapping { public Expression newColumnExpression(int index) { ColumnRef colRef = new ColumnRef( - index < extendedColumnsOffset ? tableRef : extendedTableRef, + index < extendedColumnsOffset ? tableRef : (dataTableRef != null? extendedTableRef: tableRef), this.mappedColumns.get(index).getPosition()); try { return colRef.newColumnExpression(); @@ -306,10 +311,18 @@ public class TableMapping { int columnCount = 0; for (int i = extendedColumnsOffset; i < mappedColumns.size(); i++) { if (columnRef.get(i)) { - PColumn dataColumn = ((ProjectedColumn) mappedColumns.get(i)) - .getSourceColumnRef().getColumn(); - cf.add(dataColumn.getFamilyName().getString()); - columnCount++; + if(dataTableRef != null) { + PColumn dataColumn = ((ProjectedColumn) mappedColumns.get(i)) + .getSourceColumnRef().getColumn(); + cf.add(dataColumn.getFamilyName().getString()); + columnCount++; + } else { + PColumn column = mappedColumns.get(i); + if(column.getFamilyName()!=null) { + cf.add(column.getFamilyName().getString()); + } + columnCount++; + } } } return new org.apache.hadoop.hbase.util.Pair<Integer, Integer>(cf.size(), columnCount); @@ -535,4 +548,8 @@ public class TableMapping { return projectedColumns; } + + public int getExtendedColumnsOffset() { + return extendedColumnsOffset; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d90870f5/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java index 6aeb638..645545d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java @@ -125,14 +125,13 @@ public class PhoenixTableModify extends TableModify implements PhoenixRel { // TODO TenantId, ViewIndexId, UpdatableViewColumns final int[] columnIndexes = new int[targetColumns.size()]; final int[] pkSlotIndexes = new int[targetColumns.size()]; - int pkColPosition = -1; + int nonPKColumnCount = 0; for (int i = 0; i < targetColumns.size(); i++) { PColumn column = targetColumns.get(i); if (SchemaUtil.isPKColumn(column)) { - if (pkColPosition == -1) { - pkColPosition = column.getPosition(); - } - pkSlotIndexes[i] = pkColPosition++; + pkSlotIndexes[i] = column.getPosition() - nonPKColumnCount; + } else { + nonPKColumnCount++; } columnIndexes[i] = column.getPosition(); }
