Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.1 29bed889d -> 496f376d2
PHOENIX-3271 Distribute UPSERT SELECT across cluster Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/496f376d Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/496f376d Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/496f376d Branch: refs/heads/4.x-HBase-1.1 Commit: 496f376d23e91c01c1d98286b451a4acf765173c Parents: 29bed88 Author: Ankit Singhal <[email protected]> Authored: Tue Jan 31 11:48:24 2017 +0530 Committer: Ankit Singhal <[email protected]> Committed: Tue Jan 31 11:48:24 2017 +0530 ---------------------------------------------------------------------- .../phoenix/monitoring/PhoenixMetricsIT.java | 56 ++++------- .../apache/phoenix/rpc/PhoenixServerRpcIT.java | 6 ++ .../apache/phoenix/compile/UpsertCompiler.java | 38 ++++---- .../UngroupedAggregateRegionObserver.java | 97 ++++++++++++++++++-- .../org/apache/phoenix/schema/PTableImpl.java | 10 ++ 5 files changed, 138 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/496f376d/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java index 16a66df..4d075ab 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java @@ -439,18 +439,31 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { public void testMetricsForUpsertSelectWithAutoCommit() throws Exception { String tableName1 = generateUniqueName(); long table1SaltBuckets = 6; - String ddl = "CREATE TABLE " + tableName1 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = " - + table1SaltBuckets; + String ddl = "CREATE TABLE " + tableName1 + " (K BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, V VARCHAR)" + + " SALT_BUCKETS = " + table1SaltBuckets + ", IMMUTABLE_ROWS = true"; Connection ddlConn = DriverManager.getConnection(getUrl()); ddlConn.createStatement().execute(ddl); ddlConn.close(); int numRows = 10; - insertRowsInTable(tableName1, numRows); + String dml = "UPSERT INTO " + tableName1 + " VALUES (?, ?)"; + try (Connection conn = DriverManager.getConnection(getUrl())) { + PreparedStatement stmt = conn.prepareStatement(dml); + for (int i = 1; i <= numRows; i++) { + stmt.setLong(1, i); + stmt.setString(2, "value" + i); + stmt.executeUpdate(); + } + conn.commit(); + } String tableName2 = generateUniqueName(); - ddl = "CREATE TABLE " + tableName2 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 10"; + ddl = "CREATE TABLE " + tableName2 + " (K BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, V VARCHAR)" + + " SALT_BUCKETS = 10" + ", IMMUTABLE_ROWS = true"; ddlConn = DriverManager.getConnection(getUrl()); ddlConn.createStatement().execute(ddl); + String indexName = generateUniqueName(); + ddl = "CREATE INDEX " + indexName + " ON " + tableName2 + " (V)"; + ddlConn.createStatement().execute(ddl); ddlConn.close(); String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1; @@ -602,41 +615,6 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { } @Test - public void testMetricsForUpsertSelectSameTable() throws Exception { - String tableName = generateUniqueName(); - long table1SaltBuckets = 6; - String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = " - + table1SaltBuckets; - Connection ddlConn = DriverManager.getConnection(getUrl()); - ddlConn.createStatement().execute(ddl); - ddlConn.close(); - int numRows = 10; - insertRowsInTable(tableName, numRows); - - Connection conn = DriverManager.getConnection(getUrl()); - conn.setAutoCommit(false); - String upsertSelect = "UPSERT INTO " + tableName + " SELECT * FROM " + tableName; - conn.createStatement().executeUpdate(upsertSelect); - conn.commit(); - PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); - - Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn); - // Because auto-commit is off, upsert select into the same table will run on the client. - // So we should have client side read and write metrics available. - assertMutationMetrics(tableName, numRows, mutationMetrics); - Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn); - assertReadMetricsForMutatingSql(tableName, table1SaltBuckets, readMetrics); - PhoenixRuntime.resetMetrics(pConn); - // With autocommit on, still, this upsert select runs on the client side. - conn.setAutoCommit(true); - conn.createStatement().executeUpdate(upsertSelect); - Map<String, Map<String, Long>> autoCommitMutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn); - Map<String, Map<String, Long>> autoCommitReadMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn); - assertMetricsAreSame(mutationMetrics, autoCommitMutationMetrics, mutationMetricsToSkip); - assertMetricsAreSame(readMetrics, autoCommitReadMetrics, readMetricsToSkip); - } - - @Test public void testOpenConnectionsCounter() throws Exception { long numOpenConnections = GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue(); try (Connection conn = DriverManager.getConnection(getUrl())) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/496f376d/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java index 92f7294..410f02c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java @@ -153,6 +153,12 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT { // verify that that index queue is used only once (for the first upsert) Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class)); + + TestPhoenixIndexRpcSchedulerFactory.reset(); + conn.createStatement().execute( + "CREATE INDEX " + indexName + "_1 ON " + dataTableFullName + " (v1) INCLUDE (v2)"); + // verify that that index queue is used and only once (during Upsert Select on server to build the index) + Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class)); } finally { conn.close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/496f376d/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 32ce6ad..18070d4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -85,6 +85,7 @@ import org.apache.phoenix.schema.MetaDataEntityNotFoundException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnImpl; import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTable.ViewType; @@ -506,7 +507,7 @@ public class UpsertCompiler { && tableRefToBe.equals(selectResolver.getTables().get(0)); tableRefToBe = adjustTimestampToMinOfSameTable(tableRefToBe, selectResolver.getTables()); /* We can run the upsert in a coprocessor if: - * 1) from has only 1 table and the into table matches from table + * 1) from has only 1 table * 2) the select query isn't doing aggregation (which requires a client-side final merge) * 3) autoCommit is on * 4) the table is not immutable with indexes, as the client is the one that figures out the additional @@ -523,9 +524,10 @@ public class UpsertCompiler { parallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe, useServerTimestampToBe); // If we're in the else, then it's not an aggregate, distinct, limited, or sequence using query, // so we might be able to run it entirely on the server side. - // For a table with row timestamp column, we can't guarantee that the row key will reside in the // region space managed by region servers. So we bail out on executing on server side. - runOnServer = sameTable && isAutoCommit && !table.isTransactional() && !(table.isImmutableRows() && !table.getIndexes().isEmpty()) && table.getRowTimestampColPos() == -1; + runOnServer = isAutoCommit && !table.isTransactional() + && !(table.isImmutableRows() && !table.getIndexes().isEmpty()) + && !select.isJoin() && table.getRowTimestampColPos() == -1; } // If we may be able to run on the server, add a hint that favors using the data table // if all else is equal. @@ -599,7 +601,6 @@ public class UpsertCompiler { if (valueNodes == null) { queryPlanToBe = new QueryOptimizer(services).optimize(queryPlanToBe, statement, targetColumns, parallelIteratorFactoryToBe); projectorToBe = queryPlanToBe.getProjector(); - runOnServer &= queryPlanToBe.getTableRef().equals(tableRefToBe); } final List<PColumn> allColumns = allColumnsToBe; final RowProjector projector = projectorToBe; @@ -657,41 +658,34 @@ public class UpsertCompiler { Expression literalNull = LiteralExpression.newConstant(null, column.getDataType(), Determinism.ALWAYS); projectedExpressions.add(literalNull); allColumnsIndexes[pos] = column.getPosition(); - } + } // Swap select expression at pos with i Collections.swap(projectedExpressions, i, pos); // Swap column indexes and reverse column indexes too int tempPos = allColumnsIndexes[i]; allColumnsIndexes[i] = allColumnsIndexes[pos]; allColumnsIndexes[pos] = tempPos; - reverseColumnIndexes[tempPos] = reverseColumnIndexes[i]; + reverseColumnIndexes[tempPos] = pos; reverseColumnIndexes[i] = i; } - // If any pk slots are changing, be conservative and don't run this server side. - // If the row ends up living in a different region, we'll get an error otherwise. - for (int i = 0; i < table.getPKColumns().size(); i++) { - PColumn column = table.getPKColumns().get(i); - Expression source = projectedExpressions.get(i); - if (source == null || !source.equals(new ColumnRef(tableRef, column.getPosition()).newColumnExpression())) { - // TODO: we could check the region boundaries to see if the pk will still be in it. - runOnServer = false; // bail on running server side, since PK may be changing - break; - } - } - + //////////////////////////////////////////////////////////////////// // UPSERT SELECT run server-side ///////////////////////////////////////////////////////////////////// if (runOnServer) { // Iterate through columns being projected List<PColumn> projectedColumns = Lists.newArrayListWithExpectedSize(projectedExpressions.size()); - for (int i = 0; i < projectedExpressions.size(); i++) { + int posOff = table.getBucketNum() != null ? 1 : 0; + for (int i = 0 ; i < projectedExpressions.size(); i++) { // Must make new column if position has changed PColumn column = allColumns.get(allColumnsIndexes[i]); - projectedColumns.add(column.getPosition() == i ? column : new PColumnImpl(column, i)); + projectedColumns.add(column.getPosition() == i + posOff ? column : new PColumnImpl(column, i)); } // Build table from projectedColumns - PTable projectedTable = PTableImpl.makePTable(table, projectedColumns); + // Hack to add default column family to be used on server in case no value column is projected. + PTable projectedTable = PTableImpl.makePTable(table, projectedColumns, + PNameFactory.newName(SchemaUtil.getEmptyColumnFamily(table))); + SelectStatement select = SelectStatement.create(SelectStatement.COUNT_ONE, upsert.getHint()); StatementContext statementContext = queryPlan.getContext(); @@ -717,7 +711,7 @@ public class UpsertCompiler { scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS, UngroupedAggregateRegionObserver.serialize(projectedExpressions)); // Ignore order by - it has no impact - final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null); + final QueryPlan aggPlan = new AggregatePlan(context, select, statementContext.getCurrentTable(), aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null); return new MutationPlan() { @Override public ParameterMetaData getParameterMetaData() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/496f376d/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index a888bb2..db3c792 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; @@ -88,12 +89,14 @@ import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker; import org.apache.phoenix.schema.stats.StatisticsCollector; @@ -132,8 +135,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver // TODO: move all constants into a single class public static final String UNGROUPED_AGG = "UngroupedAgg"; public static final String DELETE_AGG = "DeleteAgg"; - public static final String UPSERT_SELECT_TABLE = "UpsertSelectTable"; - public static final String UPSERT_SELECT_EXPRS = "UpsertSelectExprs"; public static final String DELETE_CQ = "DeleteCQ"; public static final String DELETE_CF = "DeleteCF"; public static final String EMPTY_CF = "EmptyCF"; @@ -210,6 +211,40 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver logger.debug("Committing bactch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString()); region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE); } + + private void commitBatchWithHTable(HTable table, Region region, List<Mutation> mutations, byte[] indexUUID, + long blockingMemstoreSize, byte[] indexMaintainersPtr, byte[] txState) throws IOException { + + if (indexUUID != null) { + // Need to add indexMaintainers for each mutation as table.batch can be distributed across servers + for (Mutation m : mutations) { + if (indexMaintainersPtr != null) { + m.setAttribute(PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr); + } + if (txState != null) { + m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); + } + m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID); + } + } + // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the + // flush happen which decrease the memstore size and then writes allowed on the region. + for (int i = 0; region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) { + try { + checkForRegionClosing(); + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + } + logger.debug("Committing batch of " + mutations.size() + " mutations for " + table); + try { + table.batch(mutations); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } /** * There is a chance that region might be closing while running balancer/move/merge. In this @@ -308,12 +343,18 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver byte[] deleteCQ = null; byte[] deleteCF = null; byte[] emptyCF = null; + HTable targetHTable = null; + boolean areMutationInSameRegion = true; ImmutableBytesWritable ptr = new ImmutableBytesWritable(); if (upsertSelectTable != null) { isUpsert = true; projectedTable = deserializeTable(upsertSelectTable); + targetHTable = new HTable(env.getConfiguration(), projectedTable.getPhysicalName().getBytes()); selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS)); values = new byte[projectedTable.getPKColumns().size()][]; + areMutationInSameRegion = Bytes.compareTo(targetHTable.getTableName(), + region.getTableDesc().getTableName().getName()) == 0 + && !isPkPositionChanging(new TableRef(projectedTable), selectExpressions); } else { byte[] isDeleteAgg = scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG); @@ -522,10 +563,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); } else if (isUpsert) { Arrays.fill(values, null); - int i = 0; + int bucketNumOffset = 0; + if (projectedTable.getBucketNum() != null) { + values[0] = new byte[] { 0 }; + bucketNumOffset = 1; + } + int i = bucketNumOffset; List<PColumn> projectedColumns = projectedTable.getColumns(); for (; i < projectedTable.getPKColumns().size(); i++) { - Expression expression = selectExpressions.get(i); + Expression expression = selectExpressions.get(i - bucketNumOffset); if (expression.evaluate(result, ptr)) { values[i] = ptr.copyBytes(); // If SortOrder from expression in SELECT doesn't match the @@ -535,12 +581,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver SortOrder.invert(values[i], 0, values[i], 0, values[i].length); } + }else{ + values[i] = ByteUtil.EMPTY_BYTE_ARRAY; } } projectedTable.newKey(ptr, values); PRow row = projectedTable.newRow(kvBuilder, ts, ptr, false); for (; i < projectedColumns.size(); i++) { - Expression expression = selectExpressions.get(i); + Expression expression = selectExpressions.get(i - bucketNumOffset); if (expression.evaluate(result, ptr)) { PColumn column = projectedColumns.get(i); if (!column.getDataType().isSizeCompatible(ptr, null, @@ -605,8 +653,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver List<List<Mutation>> batchMutationList = MutationState.getMutationBatchList(batchSize, batchSizeBytes, mutations); for (List<Mutation> batchMutations : batchMutationList) { - commitBatch(region, batchMutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, - txState); + commit(region, batchMutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, + txState, areMutationInSameRegion, targetHTable); batchMutations.clear(); } mutations.clear(); @@ -624,7 +672,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } while (hasMore); if (!mutations.isEmpty()) { - commitBatch(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState); + commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState, + areMutationInSameRegion, targetHTable); + mutations.clear(); } if (!indexMutations.isEmpty()) { @@ -638,6 +688,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver scansReferenceCount--; } } + if (targetHTable != null) { + targetHTable.close(); + } try { innerScanner.close(); } finally { @@ -678,8 +731,36 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } }; return scanner; + + } + + private void commit(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemstoreSize, + byte[] indexMaintainersPtr, byte[] txState, boolean areMutationsForSameRegion, HTable hTable) + throws IOException { + if (!areMutationsForSameRegion) { + assert hTable != null;// table cannot be null + commitBatchWithHTable(hTable, region, mutations, indexUUID, blockingMemstoreSize, indexMaintainersPtr, + txState); + } else { + commitBatch(region, mutations, indexUUID, blockingMemstoreSize, indexMaintainersPtr, txState); + } + } + + private boolean isPkPositionChanging(TableRef tableRef, List<Expression> projectedExpressions) throws SQLException { + // If the row ends up living in a different region, we'll get an error otherwise. + for (int i = 0; i < tableRef.getTable().getPKColumns().size(); i++) { + PColumn column = tableRef.getTable().getPKColumns().get(i); + Expression source = projectedExpressions.get(i); + if (source == null || !source + .equals(new ColumnRef(tableRef, column.getPosition()).newColumnExpression())) { return true; } + } + return false; } + private boolean readyToCommit(List<Mutation> mutations,int batchSize){ + return !mutations.isEmpty() && batchSize > 0 && + mutations.size() > batchSize; + } @Override public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, final InternalScanner scanner, final ScanType scanType) throws IOException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/496f376d/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index 98a0b99..b4e0a06 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -250,6 +250,16 @@ public class PTableImpl implements PTable { table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema()); } + + public static PTableImpl makePTable(PTable table, Collection<PColumn> columns, PName defaultFamily) throws SQLException { + return new PTableImpl( + table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(), + table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), + table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), defaultFamily, table.getViewStatement(), + table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), + table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), + table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema()); + } public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns) throws SQLException { return new PTableImpl(
