PHOENIX-2446 Immutable index - Index vs base table row count does not match when index is created during data load
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9ab519a3 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9ab519a3 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9ab519a3 Branch: refs/heads/4.x-HBase-0.98 Commit: 9ab519a3dbfc6821e47e8d13a3296f893a5e829b Parents: ea65e59 Author: Thomas D'Silva <[email protected]> Authored: Fri Jan 8 15:37:31 2016 -0800 Committer: Thomas D'Silva <[email protected]> Committed: Thu Jan 21 21:18:45 2016 -0800 ---------------------------------------------------------------------- .../phoenix/end2end/index/ImmutableIndexIT.java | 81 ++++++++--- .../apache/phoenix/execute/PartialCommitIT.java | 2 +- .../cache/aggcache/SpillableGroupByCache.java | 14 +- .../apache/phoenix/compile/FromCompiler.java | 8 +- .../phoenix/compile/ListJarsQueryPlan.java | 2 +- .../compile/PostLocalIndexDDLCompiler.java | 104 +++++++++++++ .../apache/phoenix/compile/TraceQueryPlan.java | 2 +- .../apache/phoenix/compile/UnionCompiler.java | 2 +- .../phoenix/coprocessor/BaseRegionScanner.java | 11 +- .../GroupedAggregateRegionObserver.java | 29 +--- .../coprocessor/MetaDataEndpointImpl.java | 3 +- .../coprocessor/MetaDataRegionObserver.java | 8 +- .../phoenix/coprocessor/ScanRegionObserver.java | 12 +- .../UngroupedAggregateRegionObserver.java | 14 +- .../coprocessor/generated/PTableProtos.java | 145 +++++++++++++++---- .../apache/phoenix/execute/MutationState.java | 57 ++++---- .../org/apache/phoenix/query/QueryServices.java | 2 + .../phoenix/query/QueryServicesOptions.java | 9 +- .../apache/phoenix/schema/DelegateColumn.java | 5 + .../apache/phoenix/schema/MetaDataClient.java | 133 +++++++---------- .../java/org/apache/phoenix/schema/PColumn.java | 2 + .../org/apache/phoenix/schema/PColumnImpl.java | 21 ++- .../apache/phoenix/schema/PMetaDataImpl.java | 2 +- .../org/apache/phoenix/schema/SaltingUtil.java | 2 +- .../phoenix/execute/CorrelatePlanTest.java | 2 +- .../phoenix/execute/UnnestArrayPlanTest.java | 4 +- .../expression/ColumnExpressionTest.java | 8 +- .../iterate/AggregateResultScannerTest.java | 4 + .../phoenix/query/QueryServicesTestImpl.java | 4 +- phoenix-protocol/src/main/MetaDataService.proto | 2 +- phoenix-protocol/src/main/PTable.proto | 1 + 31 files changed, 446 insertions(+), 249 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java index 7171382..c18e4ab 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java @@ -20,6 +20,7 @@ package org.apache.phoenix.end2end.index; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.sql.Connection; import java.sql.DriverManager; @@ -34,6 +35,7 @@ import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -48,6 +50,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; import org.apache.phoenix.end2end.Shadower; +import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PropertiesUtil; @@ -55,6 +58,7 @@ import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -68,6 +72,7 @@ import com.google.common.collect.Maps; public class ImmutableIndexIT extends BaseHBaseManagedTimeIT { private final boolean localIndex; + private final boolean transactional; private final String tableDDLOptions; private final String tableName; private final String indexName; @@ -80,6 +85,7 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT { public ImmutableIndexIT(boolean localIndex, boolean transactional) { this.localIndex = localIndex; + this.transactional = transactional; StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true"); if (transactional) { optionBuilder.append(", TRANSACTIONAL=true"); @@ -98,16 +104,55 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT { serverProps.put("hbase.coprocessor.region.classes", CreateIndexRegionObserver.class.getName()); Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2); clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true"); + clientProps.put(QueryServices.INDEX_POPULATION_SLEEP_TIME, "15000"); setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); } @Parameters(name="localIndex = {0} , transactional = {1}") public static Collection<Boolean[]> data() { - return Arrays.asList(new Boolean[][] { - { false, true }, { true, true } - }); + return Arrays.asList(new Boolean[][] { + { false, false }, { false, true }, + { true, false }, { true, true } }); } + @Test + @Ignore + public void testDropIfImmutableKeyValueColumn() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(false); + String ddl = + "CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions; + Statement stmt = conn.createStatement(); + stmt.execute(ddl); + populateTestTable(fullTableName); + ddl = + "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + + fullTableName + " (long_col1)"; + stmt.execute(ddl); + + ResultSet rs; + + rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullTableName); + assertTrue(rs.next()); + assertEquals(3, rs.getInt(1)); + rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName); + assertTrue(rs.next()); + assertEquals(3, rs.getInt(1)); + + conn.setAutoCommit(true); + String dml = "DELETE from " + fullTableName + " WHERE long_col2 = 4"; + try { + conn.createStatement().execute(dml); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(), + e.getErrorCode()); + } + + conn.createStatement().execute("DROP TABLE " + fullTableName); + } + } @Test public void testCreateIndexDuringUpsertSelect() throws Exception { @@ -119,8 +164,7 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT { + " (long_pk, varchar_pk)" + " INCLUDE (long_col1, long_col2)"; - Connection conn = DriverManager.getConnection(getUrl(), props); - try { + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(false); Statement stmt = conn.createStatement(); stmt.execute(ddl); @@ -133,7 +177,6 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT { String upsertSelect = "UPSERT INTO " + TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) " + "SELECT varchar_pk||'_upsert_select', char_pk, int_pk, long_pk, decimal_pk, date_pk FROM "+ TABLE_NAME; conn.createStatement().execute(upsertSelect); - ResultSet rs; rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + TABLE_NAME); assertTrue(rs.next()); @@ -142,9 +185,6 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT { assertTrue(rs.next()); assertEquals(440,rs.getInt(1)); } - finally { - conn.close(); - } } // used to create an index while a batch of rows are being written @@ -156,7 +196,7 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT { String tableName = c.getEnvironment().getRegion().getRegionInfo() .getTable().getNameAsString(); if (tableName.equalsIgnoreCase(TABLE_NAME) - // create the index after the second batch of 1000 rows + // create the index after the second batch && Bytes.startsWith(put.getRow(), Bytes.toBytes("varchar200_upsert_select"))) { try { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -171,13 +211,14 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT { } private static class UpsertRunnable implements Runnable { - private static final int NUM_ROWS_IN_BATCH = 10000; + private static final int NUM_ROWS_IN_BATCH = 1000; private final String fullTableName; public UpsertRunnable(String fullTableName) { this.fullTableName = fullTableName; } + @Override public void run() { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { @@ -190,12 +231,9 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT { fistRowInBatch = false; } conn.commit(); - Thread.sleep(500); } } catch (SQLException e) { throw new RuntimeException(e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); } } } @@ -213,10 +251,17 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT { Statement stmt = conn.createStatement(); stmt.execute(ddl); - ExecutorService threadPool = Executors.newFixedThreadPool(numThreads); + ExecutorService executorService = Executors.newFixedThreadPool(numThreads, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + return t; + } + }); List<Future<?>> futureList = Lists.newArrayListWithExpectedSize(numThreads); for (int i =0; i<numThreads; ++i) { - futureList.add(threadPool.submit(new UpsertRunnable(fullTableName))); + futureList.add(executorService.submit(new UpsertRunnable(fullTableName))); } // upsert some rows before creating the index Thread.sleep(500); @@ -235,8 +280,8 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT { for (Future<?> future : futureList) { future.cancel(true); } - threadPool.shutdownNow(); - threadPool.awaitTermination(30, TimeUnit.SECONDS); + executorService.shutdownNow(); + executorService.awaitTermination(30, TimeUnit.SECONDS); Thread.sleep(100); ResultSet rs; http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java index 8d7ebcb..0fb1869 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java @@ -265,7 +265,7 @@ public class PartialCommitIT extends BaseOwnClusterIT { Connection con = driver.connect(url, new Properties()); PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class)); final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator()); - // passing a null mutation staate forces the connection.newMutationState() to be used to create the MutationState + // passing a null mutation state forces the connection.newMutationState() to be used to create the MutationState return new PhoenixConnection(phxCon, null) { @Override protected MutationState newMutationState(int maxSize) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java index ce18cc2..0c76591 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java @@ -340,12 +340,7 @@ public class SpillableGroupByCache implements GroupByCache { final Iterator<Entry<ImmutableBytesWritable, Aggregator[]>> cacheIter = new EntryIterator(); // scanner using the spillable implementation - return new BaseRegionScanner() { - @Override - public HRegionInfo getRegionInfo() { - return s.getRegionInfo(); - } - + return new BaseRegionScanner(s) { @Override public void close() throws IOException { try { @@ -372,11 +367,6 @@ public class SpillableGroupByCache implements GroupByCache { SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length)); return cacheIter.hasNext(); } - - @Override - public long getMaxResultSize() { - return s.getMaxResultSize(); - } }; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java index 9b2c460..dd93c81 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java @@ -221,7 +221,7 @@ public class FromCompiler { Expression sourceExpression = projector.getColumnProjector(column.getPosition()).getExpression(); PColumnImpl projectedColumn = new PColumnImpl(column.getName(), column.getFamilyName(), sourceExpression.getDataType(), sourceExpression.getMaxLength(), sourceExpression.getScale(), sourceExpression.isNullable(), - column.getPosition(), sourceExpression.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp()); + column.getPosition(), sourceExpression.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), column.isDynamic()); projectedColumns.add(projectedColumn); } PTable t = PTableImpl.makePTable(table, projectedColumns); @@ -406,7 +406,7 @@ public class FromCompiler { String fullTableName = SchemaUtil.getTableName(schemaName, tableName); PName tenantId = connection.getTenantId(); PTable theTable = null; - if (updateCacheImmediately || connection.getAutoCommit()) { + if (updateCacheImmediately) { MetaDataMutationResult result = client.updateCache(schemaName, tableName); timeStamp = TransactionUtil.getResolvedTimestamp(connection, result); theTable = result.getTable(); @@ -547,7 +547,7 @@ public class FromCompiler { familyName = PNameFactory.newName(family); } allcolumns.add(new PColumnImpl(name, familyName, dynColumn.getDataType(), dynColumn.getMaxLength(), - dynColumn.getScale(), dynColumn.isNull(), position, dynColumn.getSortOrder(), dynColumn.getArraySize(), null, false, dynColumn.getExpression(), false)); + dynColumn.getScale(), dynColumn.isNull(), position, dynColumn.getSortOrder(), dynColumn.getArraySize(), null, false, dynColumn.getExpression(), false, true)); position++; } theTable = PTableImpl.makePTable(theTable, allcolumns); @@ -645,7 +645,7 @@ public class FromCompiler { } PColumnImpl column = new PColumnImpl(PNameFactory.newName(alias), PNameFactory.newName(QueryConstants.DEFAULT_COLUMN_FAMILY), - null, 0, 0, true, position++, SortOrder.ASC, null, null, false, null, false); + null, 0, 0, true, position++, SortOrder.ASC, null, null, false, null, false, false); columns.add(column); } PTable t = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java index 75d6ef5..7f3277a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java @@ -80,7 +80,7 @@ public class ListJarsQueryPlan implements QueryPlan { PColumn column = new PColumnImpl(PNameFactory.newName("jar_location"), null, PVarchar.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null, - false, null, false); + false, null, false, false); List<PColumn> columns = new ArrayList<PColumn>(); columns.add(column); Expression expression = http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java new file mode 100644 index 0000000..f92738c --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostLocalIndexDDLCompiler.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.compile; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.jdbc.PhoenixStatement.Operation; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.ScanUtil; + +import com.google.common.collect.Lists; + +/** + * For local indexes, we optimize the initial index population by *not* sending + * Puts over the wire for the index rows, as we don't need to do that. Instead, + * we tap into our region observer to generate the index rows based on the data + * rows as we scan + */ +public class PostLocalIndexDDLCompiler { + private final PhoenixConnection connection; + private final String tableName; + + public PostLocalIndexDDLCompiler(PhoenixConnection connection, String tableName) { + this.connection = connection; + this.tableName = tableName; + } + + public MutationPlan compile(final PTable index) throws SQLException { + try (final PhoenixStatement statement = new PhoenixStatement(connection)) { + String query = "SELECT count(*) FROM " + tableName; + final QueryPlan plan = statement.compileQuery(query); + TableRef tableRef = plan.getTableRef(); + Scan scan = plan.getContext().getScan(); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + final PTable dataTable = tableRef.getTable(); + List<PTable> indexes = Lists.newArrayListWithExpectedSize(1); + // Only build newly created index. + indexes.add(index); + IndexMaintainer.serialize(dataTable, ptr, indexes, plan.getContext().getConnection()); + // Set attribute on scan that UngroupedAggregateRegionObserver will switch on. + // We'll detect that this attribute was set the server-side and write the index + // rows per region as a result. The value of the attribute will be our persisted + // index maintainers. + // Define the LOCAL_INDEX_BUILD as a new static in BaseScannerRegionObserver + scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr)); + // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*). + // However, in this case, we need to project all of the data columns that contribute to the index. + IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection); + for (ColumnReference columnRef : indexMaintainer.getAllColumns()) { + scan.addColumn(columnRef.getFamily(), columnRef.getQualifier()); + } + + // Go through MutationPlan abstraction so that we can create local indexes + // with a connectionless connection (which makes testing easier). + return new BaseMutationPlan(plan.getContext(), Operation.UPSERT) { + + @Override + public MutationState execute() throws SQLException { + connection.getMutationState().commitDDLFence(dataTable); + Cell kv = plan.iterator().next().getValue(0); + ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()); + // A single Cell will be returned with the count(*) - we decode that here + long rowCount = PLong.INSTANCE.getCodec().decodeLong(tmpPtr, SortOrder.getDefault()); + // The contract is to return a MutationState that contains the number of rows modified. In this + // case, it's the number of rows in the data table which corresponds to the number of index + // rows that were added. + return new MutationState(0, connection, rowCount); + } + + }; + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java index 667b46b..58cdb64 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java @@ -78,7 +78,7 @@ public class TraceQueryPlan implements QueryPlan { PColumn column = new PColumnImpl(PNameFactory.newName(MetricInfo.TRACE.columnName), null, PLong.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null, - false, null, false); + false, null, false, false); List<PColumn> columns = new ArrayList<PColumn>(); columns.add(column); Expression expression = http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java index 3bc1e37..f8b2778 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java @@ -75,7 +75,7 @@ public class UnionCompiler { String name = selectNodes == null ? colProj.getName() : selectNodes.get(i).getAlias(); PColumnImpl projectedColumn = new PColumnImpl(PNameFactory.newName(name), UNION_FAMILY_NAME, sourceExpression.getDataType(), sourceExpression.getMaxLength(), sourceExpression.getScale(), sourceExpression.isNullable(), - i, sourceExpression.getSortOrder(), 500, null, false, sourceExpression.toString(), false); + i, sourceExpression.getSortOrder(), 500, null, false, sourceExpression.toString(), false, false); projectedColumns.add(projectedColumn); } Long scn = statement.getConnection().getSCN(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java index 646e7e8..b32e8f4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java @@ -24,8 +24,12 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.regionserver.RegionScanner; -public abstract class BaseRegionScanner implements RegionScanner { +public abstract class BaseRegionScanner extends DelegateRegionScanner { + public BaseRegionScanner(RegionScanner delegate) { + super(delegate); + } + @Override public boolean isFilterDone() { return false; @@ -45,11 +49,6 @@ public abstract class BaseRegionScanner implements RegionScanner { } @Override - public long getMvccReadPoint() { - return Long.MAX_VALUE; - } - - @Override public boolean nextRaw(List<Cell> result) throws IOException { return next(result); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index 1f1ba36..d03d0e9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -314,15 +314,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { aggResults.add(keyValue); } // scanner using the non spillable, memory-only implementation - return new BaseRegionScanner() { + return new BaseRegionScanner(s) { private int index = 0; @Override - public HRegionInfo getRegionInfo() { - return s.getRegionInfo(); - } - - @Override public void close() throws IOException { try { s.close(); @@ -338,11 +333,6 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { index++; return index < aggResults.size(); } - - @Override - public long getMaxResultSize() { - return s.getMaxResultSize(); - } }; } @@ -463,21 +453,11 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over ordered rows with scan " + scan + ", group by " + expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan))); } - return new BaseRegionScanner() { + return new BaseRegionScanner(scanner) { private long rowCount = 0; private ImmutableBytesWritable currentKey = null; @Override - public HRegionInfo getRegionInfo() { - return scanner.getRegionInfo(); - } - - @Override - public void close() throws IOException { - scanner.close(); - } - - @Override public boolean next(List<Cell> results) throws IOException { boolean hasMore; boolean atLimit; @@ -559,11 +539,6 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { currentKey = null; return false; } - - @Override - public long getMaxResultSize() { - return scanner.getMaxResultSize(); - } }; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 2a49ad9..f2e6136 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -620,7 +620,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso isRowTimestampKV == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject( isRowTimestampKV.getValueArray(), isRowTimestampKV.getValueOffset(), isRowTimestampKV.getValueLength())); - PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp); + + PColumn column = new PColumnImpl(colName, famName, dataType, maxLength, scale, isNullable, position-1, sortOrder, arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, false); columns.add(column); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index 58af8b8..8c588ba 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -22,6 +22,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import java.util.TimerTask; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -252,7 +253,10 @@ public class MetaDataRegionObserver extends BaseRegionObserver { } if (conn == null) { - conn = DriverManager.getConnection(getJdbcUrl(env)).unwrap(PhoenixConnection.class); + final Properties props = new Properties(); + // don't run a second index populations upsert select + props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0"); + conn = DriverManager.getConnection(getJdbcUrl(env), props).unwrap(PhoenixConnection.class); } String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable); @@ -270,7 +274,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME); long timeStamp = Math.max(0, disabledTimeStampVal - overlapTime); - + LOG.info("Starting to build index=" + indexPTable.getName() + " from timestamp=" + timeStamp); client.buildPartialIndexFromTimeStamp(indexPTable, new TableRef(dataPTable, Long.MAX_VALUE, timeStamp)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index e4fd584..5df5755 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -252,7 +252,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { } finally { region.closeRegionOperation(); } - return new BaseRegionScanner() { + return new BaseRegionScanner(s) { private Tuple tuple = firstTuple; @Override @@ -261,11 +261,6 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { } @Override - public HRegionInfo getRegionInfo() { - return s.getRegionInfo(); - } - - @Override public boolean next(List<Cell> results) throws IOException { try { if (isFilterDone()) { @@ -300,11 +295,6 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { } } } - - @Override - public long getMaxResultSize() { - return s.getMaxResultSize(); - } }; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 1a52f8e..9e681ab 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -553,25 +553,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } final KeyValue aggKeyValue = keyValue; - RegionScanner scanner = new BaseRegionScanner() { + RegionScanner scanner = new BaseRegionScanner(innerScanner) { private boolean done = !hadAny; @Override - public HRegionInfo getRegionInfo() { - return innerScanner.getRegionInfo(); - } - - @Override public boolean isFilterDone() { return done; } @Override - public void close() throws IOException { - innerScanner.close(); - } - - @Override public boolean next(List<Cell> results) throws IOException { if (done) return false; done = true; @@ -686,7 +676,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver final KeyValue aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length); - RegionScanner scanner = new BaseRegionScanner() { + RegionScanner scanner = new BaseRegionScanner(innerScanner) { @Override public HRegionInfo getRegionInfo() { return region.getRegionInfo(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java index be8d7e2..f74ed0b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java @@ -259,6 +259,16 @@ public final class PTableProtos { * <code>optional bool isRowTimestamp = 13;</code> */ boolean getIsRowTimestamp(); + + // optional bool isDynamic = 14; + /** + * <code>optional bool isDynamic = 14;</code> + */ + boolean hasIsDynamic(); + /** + * <code>optional bool isDynamic = 14;</code> + */ + boolean getIsDynamic(); } /** * Protobuf type {@code PColumn} @@ -376,6 +386,11 @@ public final class PTableProtos { isRowTimestamp_ = input.readBool(); break; } + case 112: { + bitField0_ |= 0x00002000; + isDynamic_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -678,6 +693,22 @@ public final class PTableProtos { return isRowTimestamp_; } + // optional bool isDynamic = 14; + public static final int ISDYNAMIC_FIELD_NUMBER = 14; + private boolean isDynamic_; + /** + * <code>optional bool isDynamic = 14;</code> + */ + public boolean hasIsDynamic() { + return ((bitField0_ & 0x00002000) == 0x00002000); + } + /** + * <code>optional bool isDynamic = 14;</code> + */ + public boolean getIsDynamic() { + return isDynamic_; + } + private void initFields() { columnNameBytes_ = com.google.protobuf.ByteString.EMPTY; familyNameBytes_ = com.google.protobuf.ByteString.EMPTY; @@ -692,6 +723,7 @@ public final class PTableProtos { viewReferenced_ = false; expression_ = ""; isRowTimestamp_ = false; + isDynamic_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -764,6 +796,9 @@ public final class PTableProtos { if (((bitField0_ & 0x00001000) == 0x00001000)) { output.writeBool(13, isRowTimestamp_); } + if (((bitField0_ & 0x00002000) == 0x00002000)) { + output.writeBool(14, isDynamic_); + } getUnknownFields().writeTo(output); } @@ -825,6 +860,10 @@ public final class PTableProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(13, isRowTimestamp_); } + if (((bitField0_ & 0x00002000) == 0x00002000)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(14, isDynamic_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -913,6 +952,11 @@ public final class PTableProtos { result = result && (getIsRowTimestamp() == other.getIsRowTimestamp()); } + result = result && (hasIsDynamic() == other.hasIsDynamic()); + if (hasIsDynamic()) { + result = result && (getIsDynamic() + == other.getIsDynamic()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -978,6 +1022,10 @@ public final class PTableProtos { hash = (37 * hash) + ISROWTIMESTAMP_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getIsRowTimestamp()); } + if (hasIsDynamic()) { + hash = (37 * hash) + ISDYNAMIC_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getIsDynamic()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -1113,6 +1161,8 @@ public final class PTableProtos { bitField0_ = (bitField0_ & ~0x00000800); isRowTimestamp_ = false; bitField0_ = (bitField0_ & ~0x00001000); + isDynamic_ = false; + bitField0_ = (bitField0_ & ~0x00002000); return this; } @@ -1193,6 +1243,10 @@ public final class PTableProtos { to_bitField0_ |= 0x00001000; } result.isRowTimestamp_ = isRowTimestamp_; + if (((from_bitField0_ & 0x00002000) == 0x00002000)) { + to_bitField0_ |= 0x00002000; + } + result.isDynamic_ = isDynamic_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1252,6 +1306,9 @@ public final class PTableProtos { if (other.hasIsRowTimestamp()) { setIsRowTimestamp(other.getIsRowTimestamp()); } + if (other.hasIsDynamic()) { + setIsDynamic(other.getIsDynamic()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1819,6 +1876,39 @@ public final class PTableProtos { return this; } + // optional bool isDynamic = 14; + private boolean isDynamic_ ; + /** + * <code>optional bool isDynamic = 14;</code> + */ + public boolean hasIsDynamic() { + return ((bitField0_ & 0x00002000) == 0x00002000); + } + /** + * <code>optional bool isDynamic = 14;</code> + */ + public boolean getIsDynamic() { + return isDynamic_; + } + /** + * <code>optional bool isDynamic = 14;</code> + */ + public Builder setIsDynamic(boolean value) { + bitField0_ |= 0x00002000; + isDynamic_ = value; + onChanged(); + return this; + } + /** + * <code>optional bool isDynamic = 14;</code> + */ + public Builder clearIsDynamic() { + bitField0_ = (bitField0_ & ~0x00002000); + isDynamic_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:PColumn) } @@ -6909,38 +6999,39 @@ public final class PTableProtos { descriptor; static { java.lang.String[] descriptorData = { - "\n\014PTable.proto\032\021PGuidePosts.proto\"\223\002\n\007PC" + + "\n\014PTable.proto\032\021PGuidePosts.proto\"\246\002\n\007PC" + "olumn\022\027\n\017columnNameBytes\030\001 \002(\014\022\027\n\017family" + "NameBytes\030\002 \001(\014\022\020\n\010dataType\030\003 \002(\t\022\021\n\tmax" + "Length\030\004 \001(\005\022\r\n\005scale\030\005 \001(\005\022\020\n\010nullable\030" + "\006 \002(\010\022\020\n\010position\030\007 \002(\005\022\021\n\tsortOrder\030\010 \002" + "(\005\022\021\n\tarraySize\030\t \001(\005\022\024\n\014viewConstant\030\n " + "\001(\014\022\026\n\016viewReferenced\030\013 \001(\010\022\022\n\nexpressio" + - "n\030\014 \001(\t\022\026\n\016isRowTimestamp\030\r \001(\010\"\232\001\n\013PTab" + - "leStats\022\013\n\003key\030\001 \002(\014\022\016\n\006values\030\002 \003(\014\022\033\n\023" + - "guidePostsByteCount\030\003 \001(\003\022\025\n\rkeyBytesCou", - "nt\030\004 \001(\003\022\027\n\017guidePostsCount\030\005 \001(\005\022!\n\013pGu" + - "idePosts\030\006 \001(\0132\014.PGuidePosts\"\244\005\n\006PTable\022" + - "\027\n\017schemaNameBytes\030\001 \002(\014\022\026\n\016tableNameByt" + - "es\030\002 \002(\014\022\036\n\ttableType\030\003 \002(\0162\013.PTableType" + - "\022\022\n\nindexState\030\004 \001(\t\022\026\n\016sequenceNumber\030\005" + - " \002(\003\022\021\n\ttimeStamp\030\006 \002(\003\022\023\n\013pkNameBytes\030\007" + - " \001(\014\022\021\n\tbucketNum\030\010 \002(\005\022\031\n\007columns\030\t \003(\013" + - "2\010.PColumn\022\030\n\007indexes\030\n \003(\0132\007.PTable\022\027\n\017" + - "isImmutableRows\030\013 \002(\010\022 \n\nguidePosts\030\014 \003(" + - "\0132\014.PTableStats\022\032\n\022dataTableNameBytes\030\r ", - "\001(\014\022\031\n\021defaultFamilyName\030\016 \001(\014\022\022\n\ndisabl" + - "eWAL\030\017 \002(\010\022\023\n\013multiTenant\030\020 \002(\010\022\020\n\010viewT" + - "ype\030\021 \001(\014\022\025\n\rviewStatement\030\022 \001(\014\022\025\n\rphys" + - "icalNames\030\023 \003(\014\022\020\n\010tenantId\030\024 \001(\014\022\023\n\013vie" + - "wIndexId\030\025 \001(\005\022\021\n\tindexType\030\026 \001(\014\022\026\n\016sta" + - "tsTimeStamp\030\027 \001(\003\022\022\n\nstoreNulls\030\030 \001(\010\022\027\n" + - "\017baseColumnCount\030\031 \001(\005\022\036\n\026rowKeyOrderOpt" + - "imizable\030\032 \001(\010\022\025\n\rtransactional\030\033 \001(\010\022\034\n" + - "\024updateCacheFrequency\030\034 \001(\003*A\n\nPTableTyp" + - "e\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VIEW\020\002\022\t\n\005IND", - "EX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache.phoenix.cop" + - "rocessor.generatedB\014PTableProtosH\001\210\001\001\240\001\001" + "n\030\014 \001(\t\022\026\n\016isRowTimestamp\030\r \001(\010\022\021\n\tisDyn" + + "amic\030\016 \001(\010\"\232\001\n\013PTableStats\022\013\n\003key\030\001 \002(\014\022" + + "\016\n\006values\030\002 \003(\014\022\033\n\023guidePostsByteCount\030\003", + " \001(\003\022\025\n\rkeyBytesCount\030\004 \001(\003\022\027\n\017guidePost" + + "sCount\030\005 \001(\005\022!\n\013pGuidePosts\030\006 \001(\0132\014.PGui" + + "dePosts\"\244\005\n\006PTable\022\027\n\017schemaNameBytes\030\001 " + + "\002(\014\022\026\n\016tableNameBytes\030\002 \002(\014\022\036\n\ttableType" + + "\030\003 \002(\0162\013.PTableType\022\022\n\nindexState\030\004 \001(\t\022" + + "\026\n\016sequenceNumber\030\005 \002(\003\022\021\n\ttimeStamp\030\006 \002" + + "(\003\022\023\n\013pkNameBytes\030\007 \001(\014\022\021\n\tbucketNum\030\010 \002" + + "(\005\022\031\n\007columns\030\t \003(\0132\010.PColumn\022\030\n\007indexes" + + "\030\n \003(\0132\007.PTable\022\027\n\017isImmutableRows\030\013 \002(\010" + + "\022 \n\nguidePosts\030\014 \003(\0132\014.PTableStats\022\032\n\022da", + "taTableNameBytes\030\r \001(\014\022\031\n\021defaultFamilyN" + + "ame\030\016 \001(\014\022\022\n\ndisableWAL\030\017 \002(\010\022\023\n\013multiTe" + + "nant\030\020 \002(\010\022\020\n\010viewType\030\021 \001(\014\022\025\n\rviewStat" + + "ement\030\022 \001(\014\022\025\n\rphysicalNames\030\023 \003(\014\022\020\n\010te" + + "nantId\030\024 \001(\014\022\023\n\013viewIndexId\030\025 \001(\005\022\021\n\tind" + + "exType\030\026 \001(\014\022\026\n\016statsTimeStamp\030\027 \001(\003\022\022\n\n" + + "storeNulls\030\030 \001(\010\022\027\n\017baseColumnCount\030\031 \001(" + + "\005\022\036\n\026rowKeyOrderOptimizable\030\032 \001(\010\022\025\n\rtra" + + "nsactional\030\033 \001(\010\022\034\n\024updateCacheFrequency" + + "\030\034 \001(\003*A\n\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER", + "\020\001\022\010\n\004VIEW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org" + + ".apache.phoenix.coprocessor.generatedB\014P" + + "TableProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -6952,7 +7043,7 @@ public final class PTableProtos { internal_static_PColumn_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_PColumn_descriptor, - new java.lang.String[] { "ColumnNameBytes", "FamilyNameBytes", "DataType", "MaxLength", "Scale", "Nullable", "Position", "SortOrder", "ArraySize", "ViewConstant", "ViewReferenced", "Expression", "IsRowTimestamp", }); + new java.lang.String[] { "ColumnNameBytes", "FamilyNameBytes", "DataType", "MaxLength", "Scale", "Nullable", "Position", "SortOrder", "ArraySize", "ViewConstant", "ViewReferenced", "Expression", "IsRowTimestamp", "IsDynamic", }); internal_static_PTableStats_descriptor = getDescriptor().getMessageTypes().get(1); internal_static_PTableStats_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index aa217ca..07a1e52 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -52,6 +52,7 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.IndexMetaDataCacheClient; import org.apache.phoenix.index.PhoenixIndexCodec; @@ -569,7 +570,7 @@ public class MutationState implements SQLCloseable { List<Mutation> indexMutations; try { indexMutations = - IndexUtil.generateIndexData(table, index, mutationsPertainingToIndex, + IndexUtil.generateIndexData(table, index, mutationsPertainingToIndex, connection.getKeyValueBuilder(), connection); // we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map if (!sendAll) { @@ -719,37 +720,35 @@ public class MutationState implements SQLCloseable { long serverTimeStamp = tableRef.getTimeStamp(); // If we're auto committing, we've already validated the schema when we got the ColumnResolver, // so no need to do it again here. - if (!connection.getAutoCommit()) { - PTable table = tableRef.getTable(); - MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString()); - PTable resolvedTable = result.getTable(); - if (resolvedTable == null) { - throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString()); - } - // Always update tableRef table as the one we've cached may be out of date since when we executed - // the UPSERT VALUES call and updated in the cache before this. - tableRef.setTable(resolvedTable); - long timestamp = result.getMutationTime(); - if (timestamp != QueryConstants.UNSET_TIMESTAMP) { - serverTimeStamp = timestamp; - if (result.wasUpdated()) { - // TODO: use bitset? - PColumn[] columns = new PColumn[resolvedTable.getColumns().size()]; - for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) { - RowMutationState valueEntry = rowEntry.getValue(); - if (valueEntry != null) { - Map<PColumn, byte[]> colValues = valueEntry.getColumnValues(); - if (colValues != PRow.DELETE_MARKER) { - for (PColumn column : colValues.keySet()) { - columns[column.getPosition()] = column; - } + PTable table = tableRef.getTable(); + MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName().getString()); + PTable resolvedTable = result.getTable(); + if (resolvedTable == null) { + throw new TableNotFoundException(table.getSchemaName().getString(), table.getTableName().getString()); + } + // Always update tableRef table as the one we've cached may be out of date since when we executed + // the UPSERT VALUES call and updated in the cache before this. + tableRef.setTable(resolvedTable); + long timestamp = result.getMutationTime(); + if (timestamp != QueryConstants.UNSET_TIMESTAMP) { + serverTimeStamp = timestamp; + if (result.wasUpdated()) { + List<PColumn> columns = Lists.newArrayListWithExpectedSize(table.getColumns().size()); + for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) { + RowMutationState valueEntry = rowEntry.getValue(); + if (valueEntry != null) { + Map<PColumn, byte[]> colValues = valueEntry.getColumnValues(); + if (colValues != PRow.DELETE_MARKER) { + for (PColumn column : colValues.keySet()) { + if (!column.isDynamic()) + columns.add(column); } } } - for (PColumn column : columns) { - if (column != null) { - resolvedTable.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString()); - } + } + for (PColumn column : columns) { + if (column != null) { + resolvedTable.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString()); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index b215e55..cfc9ec2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -203,6 +203,8 @@ public interface QueryServices extends SQLCloseable { public static final String HCONNECTION_POOL_MAX_SIZE = "hbase.hconnection.threads.max"; public static final String HTABLE_MAX_THREADS = "hbase.htable.threads.max"; + // time to wait before running second index population upsert select (so that any pending batches of rows on region server are also written to index) + public static final String INDEX_POPULATION_SLEEP_TIME = "phoenix.index.population.wait.time"; /** * Get executor service used for parallel scans */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index b50e135..69a1ff7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -37,6 +37,7 @@ import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILLABLE_ATTRIB; import static org.apache.phoenix.query.QueryServices.GROUPBY_SPILL_FILES_ATTRIB; import static org.apache.phoenix.query.QueryServices.IMMUTABLE_ROWS_ATTRIB; import static org.apache.phoenix.query.QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB; +import static org.apache.phoenix.query.QueryServices.INDEX_POPULATION_SLEEP_TIME; import static org.apache.phoenix.query.QueryServices.KEEP_ALIVE_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.MASTER_INFO_PORT_ATTRIB; import static org.apache.phoenix.query.QueryServices.MAX_CLIENT_METADATA_CACHE_SIZE_ATTRIB; @@ -220,6 +221,8 @@ public class QueryServicesOptions { public static final boolean DEFAULT_RETURN_SEQUENCE_VALUES = false; public static final String DEFAULT_EXTRA_JDBC_ARGUMENTS = ""; + + public static final long DEFAULT_INDEX_POPULATION_SLEEP_TIME = 5000; // QueryServer defaults -- ensure ThinClientUtil is also updated since phoenix-server-client // doesn't depend on phoenix-core. @@ -427,7 +430,6 @@ public class QueryServicesOptions { return set(GROUPBY_SPILL_FILES_ATTRIB, num); } - private QueryServicesOptions set(String name, boolean value) { config.set(name, Boolean.toString(value)); return this; @@ -637,4 +639,9 @@ public class QueryServicesOptions { return this; } + public QueryServicesOptions setDefaultIndexPopulationWaitTime(long waitTime) { + config.setLong(INDEX_POPULATION_SLEEP_TIME, waitTime); + return this; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java index c904120..a60229e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java @@ -85,4 +85,9 @@ public class DelegateColumn extends DelegateDatum implements PColumn { public String toString() { return getDelegate().toString(); } + + @Override + public boolean isDynamic() { + return getDelegate().isDynamic(); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index d559842..e8d995c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -114,9 +114,6 @@ import java.util.Map; import java.util.Properties; import java.util.Set; -import co.cask.tephra.TxConstants; - -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; @@ -125,17 +122,15 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; -import org.apache.phoenix.compile.BaseMutationPlan; import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.IndexExpressionCompiler; import org.apache.phoenix.compile.MutationPlan; import org.apache.phoenix.compile.PostDDLCompiler; import org.apache.phoenix.compile.PostIndexDDLCompiler; -import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.PostLocalIndexDDLCompiler; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.compile.StatementNormalizer; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; @@ -153,7 +148,6 @@ import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; -import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.parse.AddColumnStatement; import org.apache.phoenix.parse.AlterIndexStatement; import org.apache.phoenix.parse.ColumnDef; @@ -210,6 +204,8 @@ import org.apache.phoenix.util.UpgradeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import co.cask.tephra.TxConstants; + import com.google.common.base.Objects; import com.google.common.collect.Iterators; import com.google.common.collect.ListMultimap; @@ -821,7 +817,7 @@ public class MetaDataClient { } PColumn column = new PColumnImpl(PNameFactory.newName(columnName), familyName, def.getDataType(), - def.getMaxLength(), def.getScale(), isNull, position, sortOrder, def.getArraySize(), null, false, def.getExpression(), isRowTimestamp); + def.getMaxLength(), def.getScale(), isNull, position, sortOrder, def.getArraySize(), null, false, def.getExpression(), isRowTimestamp, false); return column; } catch (IllegalArgumentException e) { // Based on precondition check in constructor throw new SQLException(e); @@ -1021,89 +1017,68 @@ public class MetaDataClient { try { connection.setAutoCommit(true); MutationPlan mutationPlan; - - // For local indexes, we optimize the initial index population by *not* sending Puts over - // the wire for the index rows, as we don't need to do that. Instead, we tap into our - // region observer to generate the index rows based on the data rows as we scan if (index.getIndexType() == IndexType.LOCAL) { - try (final PhoenixStatement statement = new PhoenixStatement(connection)) { - String tableName = getFullTableName(dataTableRef); - String query = "SELECT count(*) FROM " + tableName; - final QueryPlan plan = statement.compileQuery(query); - TableRef tableRef = plan.getTableRef(); - // Set attribute on scan that UngroupedAggregateRegionObserver will switch on. - // We'll detect that this attribute was set the server-side and write the index - // rows per region as a result. The value of the attribute will be our persisted - // index maintainers. - // Define the LOCAL_INDEX_BUILD as a new static in BaseScannerRegionObserver - Scan scan = plan.getContext().getScan(); - try { - if(ScanUtil.isDefaultTimeRange(scan.getTimeRange())) { - Long scn = connection.getSCN(); - if (scn == null) { - scn = plan.getContext().getCurrentTime(); - } - scan.setTimeRange(dataTableRef.getLowerBoundTimeStamp(),scn); - } - } catch (IOException e) { - throw new SQLException(e); - } - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - final PTable dataTable = tableRef.getTable(); - for(PTable idx: dataTable.getIndexes()) { - if(idx.getName().equals(index.getName())) { - index = idx; - break; - } - } - List<PTable> indexes = Lists.newArrayListWithExpectedSize(1); - // Only build newly created index. - indexes.add(index); - IndexMaintainer.serialize(dataTable, ptr, indexes, plan.getContext().getConnection()); - scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr)); - // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*). - // However, in this case, we need to project all of the data columns that contribute to the index. - IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection); - for (ColumnReference columnRef : indexMaintainer.getAllColumns()) { - scan.addColumn(columnRef.getFamily(), columnRef.getQualifier()); - } - - // Go through MutationPlan abstraction so that we can create local indexes - // with a connectionless connection (which makes testing easier). - mutationPlan = new BaseMutationPlan(plan.getContext(), Operation.UPSERT) { - - @Override - public MutationState execute() throws SQLException { - connection.getMutationState().commitDDLFence(dataTable); - Cell kv = plan.iterator().next().getValue(0); - ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()); - // A single Cell will be returned with the count(*) - we decode that here - long rowCount = PLong.INSTANCE.getCodec().decodeLong(tmpPtr, SortOrder.getDefault()); - // The contract is to return a MutationState that contains the number of rows modified. In this - // case, it's the number of rows in the data table which corresponds to the number of index - // rows that were added. - return new MutationState(0, connection, rowCount); - } - - }; - } + PostLocalIndexDDLCompiler compiler = + new PostLocalIndexDDLCompiler(connection, getFullTableName(dataTableRef)); + mutationPlan = compiler.compile(index); } else { PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef); mutationPlan = compiler.compile(index); - try { - Long scn = connection.getSCN(); + } + Scan scan = mutationPlan.getContext().getScan(); + Long scn = connection.getSCN(); + try { + if (ScanUtil.isDefaultTimeRange(scan.getTimeRange())) { if (scn == null) { scn = mutationPlan.getContext().getCurrentTime(); } - mutationPlan.getContext().getScan().setTimeRange(dataTableRef.getLowerBoundTimeStamp(), scn); + scan.setTimeRange(dataTableRef.getLowerBoundTimeStamp(), scn); + } + } catch (IOException e) { + throw new SQLException(e); + } + + // execute index population upsert select + long startTime = System.currentTimeMillis(); + MutationState state = connection.getQueryServices().updateData(mutationPlan); + long firstUpsertSelectTime = System.currentTimeMillis() - startTime; + + // for global indexes on non transactional tables we might have to + // run a second index population upsert select to handle data rows + // that were being written on the server while the index was created + long sleepTime = + connection + .getQueryServices() + .getProps() + .getLong(QueryServices.INDEX_POPULATION_SLEEP_TIME, + QueryServicesOptions.DEFAULT_INDEX_POPULATION_SLEEP_TIME); + if (!dataTableRef.getTable().isTransactional() && sleepTime > 0) { + long delta = sleepTime - firstUpsertSelectTime; + if (delta > 0) { + try { + Thread.sleep(delta); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION) + .setRootCause(e).build().buildException(); + } + } + // set the min timestamp of second index upsert select some time before the index + // was created + long minTimestamp = index.getTimeStamp() - firstUpsertSelectTime; + try { + mutationPlan.getContext().getScan().setTimeRange(minTimestamp, scn); } catch (IOException e) { throw new SQLException(e); } + MutationState newMutationState = + connection.getQueryServices().updateData(mutationPlan); + state.join(newMutationState); } - MutationState state = connection.getQueryServices().updateData(mutationPlan); + indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null, - TableName.create(index.getSchemaName().getString(), index.getTableName().getString())), - dataTableRef.getTable().getTableName().getString(), false, PIndexState.ACTIVE); + TableName.create(index.getSchemaName().getString(), index.getTableName().getString())), + dataTableRef.getTable().getTableName().getString(), false, PIndexState.ACTIVE); alterIndex(indexStatement); return state; http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java index 357ce6f..0f5fa44 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumn.java @@ -58,4 +58,6 @@ public interface PColumn extends PDatum { * @return whether this column represents/stores the hbase cell timestamp. */ boolean isRowTimestamp(); + + boolean isDynamic(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java index cff276b..a556f76 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PColumnImpl.java @@ -39,6 +39,7 @@ public class PColumnImpl implements PColumn { private boolean isViewReferenced; private String expressionStr; private boolean isRowTimestamp; + private boolean isDynamic; public PColumnImpl() { } @@ -50,13 +51,13 @@ public class PColumnImpl implements PColumn { Integer scale, boolean nullable, int position, - SortOrder sortOrder, Integer arrSize, byte[] viewConstant, boolean isViewReferenced, String expressionStr, boolean isRowTimestamp) { - init(name, familyName, dataType, maxLength, scale, nullable, position, sortOrder, arrSize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp); + SortOrder sortOrder, Integer arrSize, byte[] viewConstant, boolean isViewReferenced, String expressionStr, boolean isRowTimestamp, boolean isDynamic) { + init(name, familyName, dataType, maxLength, scale, nullable, position, sortOrder, arrSize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, isDynamic); } public PColumnImpl(PColumn column, int position) { this(column.getName(), column.getFamilyName(), column.getDataType(), column.getMaxLength(), - column.getScale(), column.isNullable(), position, column.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp()); + column.getScale(), column.isNullable(), position, column.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp(), column.isDynamic()); } private void init(PName name, @@ -68,7 +69,7 @@ public class PColumnImpl implements PColumn { int position, SortOrder sortOrder, Integer arrSize, - byte[] viewConstant, boolean isViewReferenced, String expressionStr, boolean isRowTimestamp) { + byte[] viewConstant, boolean isViewReferenced, String expressionStr, boolean isRowTimestamp, boolean isDynamic) { Preconditions.checkNotNull(sortOrder); this.dataType = dataType; if (familyName == null) { @@ -92,6 +93,7 @@ public class PColumnImpl implements PColumn { this.isViewReferenced = isViewReferenced; this.expressionStr = expressionStr; this.isRowTimestamp = isRowTimestamp; + this.isDynamic = isDynamic; } @Override @@ -198,6 +200,11 @@ public class PColumnImpl implements PColumn { public boolean isRowTimestamp() { return isRowTimestamp; } + + @Override + public boolean isDynamic() { + return isDynamic; + } /** * Create a PColumn instance from PBed PColumn instance @@ -240,8 +247,12 @@ public class PColumnImpl implements PColumn { expressionStr = column.getExpression(); } boolean isRowTimestamp = column.getIsRowTimestamp(); + boolean isDynamic = false; + if (column.hasIsDynamic()) { + isDynamic = column.getIsDynamic(); + } return new PColumnImpl(columnName, familyName, dataType, maxLength, scale, nullable, position, sortOrder, - arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp); + arraySize, viewConstant, isViewReferenced, expressionStr, isRowTimestamp, isDynamic); } public static PTableProtos.PColumn toProto(PColumn column) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java index 66b4af3..413d116 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PMetaDataImpl.java @@ -414,7 +414,7 @@ public class PMetaDataImpl implements PMetaData { // Update position of columns that follow removed column for (int i = position+1; i < oldColumns.size(); i++) { PColumn oldColumn = oldColumns.get(i); - PColumn newColumn = new PColumnImpl(oldColumn.getName(), oldColumn.getFamilyName(), oldColumn.getDataType(), oldColumn.getMaxLength(), oldColumn.getScale(), oldColumn.isNullable(), i-1+positionOffset, oldColumn.getSortOrder(), oldColumn.getArraySize(), oldColumn.getViewConstant(), oldColumn.isViewReferenced(), null, oldColumn.isRowTimestamp()); + PColumn newColumn = new PColumnImpl(oldColumn.getName(), oldColumn.getFamilyName(), oldColumn.getDataType(), oldColumn.getMaxLength(), oldColumn.getScale(), oldColumn.isNullable(), i-1+positionOffset, oldColumn.getSortOrder(), oldColumn.getArraySize(), oldColumn.getViewConstant(), oldColumn.isViewReferenced(), null, oldColumn.isRowTimestamp(), oldColumn.isDynamic()); columns.add(newColumn); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java index 4ac54cb..734a9ed 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SaltingUtil.java @@ -38,7 +38,7 @@ public class SaltingUtil { public static final String SALTING_COLUMN_NAME = "_SALT"; public static final String SALTED_ROW_KEY_NAME = "_SALTED_KEY"; public static final PColumnImpl SALTING_COLUMN = new PColumnImpl( - PNameFactory.newName(SALTING_COLUMN_NAME), null, PBinary.INSTANCE, 1, 0, false, 0, SortOrder.getDefault(), 0, null, false, null, false); + PNameFactory.newName(SALTING_COLUMN_NAME), null, PBinary.INSTANCE, 1, 0, false, 0, SortOrder.getDefault(), 0, null, false, null, false, false); public static final RowKeySchema VAR_BINARY_SALTED_SCHEMA = new RowKeySchemaBuilder(2) .addField(SALTING_COLUMN, false, SortOrder.getDefault()) .addField(SchemaUtil.VAR_BINARY_DATUM, false, SortOrder.getDefault()).build(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java index 72f3e01..6b89187 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java @@ -226,7 +226,7 @@ public class CorrelatePlanTest { Expression expr = LiteralExpression.newConstant(row[i]); columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), expr.getDataType(), expr.getMaxLength(), expr.getScale(), expr.isNullable(), - i, expr.getSortOrder(), null, null, false, name, false)); + i, expr.getSortOrder(), null, null, false, name, false, false)); } try { PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java index d508707..8b2b096 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java @@ -116,8 +116,8 @@ public class UnnestArrayPlanTest { LiteralExpression dummy = LiteralExpression.newConstant(null, arrayType); RowKeyValueAccessor accessor = new RowKeyValueAccessor(Arrays.asList(dummy), 0); UnnestArrayPlan plan = new UnnestArrayPlan(subPlan, new RowKeyColumnExpression(dummy, accessor), withOrdinality); - PColumn elemColumn = new PColumnImpl(PNameFactory.newName("ELEM"), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), baseType, null, null, true, 0, SortOrder.getDefault(), null, null, false, "", false); - PColumn indexColumn = withOrdinality ? new PColumnImpl(PNameFactory.newName("IDX"), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), PInteger.INSTANCE, null, null, true, 0, SortOrder.getDefault(), null, null, false, "", false) : null; + PColumn elemColumn = new PColumnImpl(PNameFactory.newName("ELEM"), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), baseType, null, null, true, 0, SortOrder.getDefault(), null, null, false, "", false, false); + PColumn indexColumn = withOrdinality ? new PColumnImpl(PNameFactory.newName("IDX"), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), PInteger.INSTANCE, null, null, true, 0, SortOrder.getDefault(), null, null, false, "", false, false) : null; List<PColumn> columns = withOrdinality ? Arrays.asList(elemColumn, indexColumn) : Arrays.asList(elemColumn); ProjectedColumnExpression elemExpr = new ProjectedColumnExpression(elemColumn, columns, 0, elemColumn.getName().getString()); ProjectedColumnExpression indexExpr = withOrdinality ? new ProjectedColumnExpression(indexColumn, columns, 1, indexColumn.getName().getString()) : null; http://git-wip-us.apache.org/repos/asf/phoenix/blob/9ab519a3/phoenix-core/src/test/java/org/apache/phoenix/expression/ColumnExpressionTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/expression/ColumnExpressionTest.java b/phoenix-core/src/test/java/org/apache/phoenix/expression/ColumnExpressionTest.java index 7a299a9..7ee579c 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/expression/ColumnExpressionTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/expression/ColumnExpressionTest.java @@ -41,7 +41,7 @@ public class ColumnExpressionTest { int maxLen = 30; int scale = 5; PColumn column = new PColumnImpl(PNameFactory.newName("c1"), PNameFactory.newName("f1"), PDecimal.INSTANCE, maxLen, scale, - true, 20, SortOrder.getDefault(), 0, null, false, null, false); + true, 20, SortOrder.getDefault(), 0, null, false, null, false, false); ColumnExpression colExp = new KeyValueColumnExpression(column); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dOut = new DataOutputStream(baos); @@ -61,7 +61,7 @@ public class ColumnExpressionTest { public void testSerializationWithNullScale() throws Exception { int maxLen = 30; PColumn column = new PColumnImpl(PNameFactory.newName("c1"), PNameFactory.newName("f1"), PBinary.INSTANCE, maxLen, null, - true, 20, SortOrder.getDefault(), 0, null, false, null, false); + true, 20, SortOrder.getDefault(), 0, null, false, null, false, false); ColumnExpression colExp = new KeyValueColumnExpression(column); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dOut = new DataOutputStream(baos); @@ -81,7 +81,7 @@ public class ColumnExpressionTest { public void testSerializationWithNullMaxLength() throws Exception { int scale = 5; PColumn column = new PColumnImpl(PNameFactory.newName("c1"), PNameFactory.newName("f1"), PVarchar.INSTANCE, null, scale, - true, 20, SortOrder.getDefault(), 0, null, false, null, false); + true, 20, SortOrder.getDefault(), 0, null, false, null, false, false); ColumnExpression colExp = new KeyValueColumnExpression(column); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dOut = new DataOutputStream(baos); @@ -100,7 +100,7 @@ public class ColumnExpressionTest { @Test public void testSerializationWithNullScaleAndMaxLength() throws Exception { PColumn column = new PColumnImpl(PNameFactory.newName("c1"), PNameFactory.newName("f1"), PDecimal.INSTANCE, null, null, true, - 20, SortOrder.getDefault(), 0, null, false, null, false); + 20, SortOrder.getDefault(), 0, null, false, null, false, false); ColumnExpression colExp = new KeyValueColumnExpression(column); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dOut = new DataOutputStream(baos);
