This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 881dd8d PHOENIX-5998 Paged server side ungrouped aggregate operations 881dd8d is described below commit 881dd8d6033def07d0371cebbaa5635031594277 Author: Kadir Ozdemir <kozde...@salesforce.com> AuthorDate: Wed Nov 11 16:32:24 2020 -0800 PHOENIX-5998 Paged server side ungrouped aggregate operations --- .../apache/phoenix/end2end/SpillableGroupByIT.java | 1 + .../org/apache/phoenix/end2end/UpsertSelectIT.java | 1 - .../coprocessor/BaseScannerRegionObserver.java | 6 +- .../UngroupedAggregateRegionObserver.java | 719 ++++----------------- .../UngroupedAggregateRegionScanner.java | 670 +++++++++++++++++++ .../phoenix/iterate/TableResultIterator.java | 4 + .../UngroupedAggregatingResultIterator.java | 43 +- .../org/apache/phoenix/query/QueryServices.java | 2 + .../apache/phoenix/query/QueryServicesOptions.java | 3 +- .../java/org/apache/phoenix/query/BaseTest.java | 9 +- 10 files changed, 838 insertions(+), 620 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java index d7e1b37..c6692f8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java @@ -80,6 +80,7 @@ public class SpillableGroupByIT extends BaseOwnClusterIT { props.put(QueryServices.STATS_COLLECTION_ENABLED, Boolean.toString(false)); props.put(QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB, Boolean.TRUE.toString()); props.put(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.TRUE.toString()); + props.put(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, Long.toString(1000)); // Must update config before starting server setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java index c55e9b7..007d1ef 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java @@ -1680,7 +1680,6 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT { fail(); } catch (SQLException e) { assertEquals(SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY.getErrorCode(), e.getErrorCode()); - assertFalse(e.getMessage().contains(invalidValue)); assertTrue(e.getMessage().contains(columnTypeInfo)); } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 712f719..423d0d3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -74,9 +74,13 @@ abstract public class BaseScannerRegionObserver extends CompatBaseScannerRegionO public static final String GROUP_BY_LIMIT = "_GroupByLimit"; public static final String LOCAL_INDEX = "_LocalIndex"; public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild"; - // The number of index rows to be rebuild in one RPC call public static final String INDEX_REBUILD_PAGING = "_IndexRebuildPaging"; + // The number of index rows to be rebuild in one RPC call public static final String INDEX_REBUILD_PAGE_ROWS = "_IndexRebuildPageRows"; + public static final String SERVER_PAGING = "_ServerPaging"; + // The number of rows to be scanned in one RPC call + public static final String AGGREGATE_PAGE_SIZE_IN_MS = "_AggregatePageSizeInMs"; + // Index verification type done by the index tool public static final String INDEX_REBUILD_VERIFY_TYPE = "_IndexRebuildVerifyType"; public static final String INDEX_RETRY_VERIFY = "_IndexRetryVerify"; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 2d58c69..c8fd915 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -21,9 +21,6 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY; -import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; -import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB; -import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone; import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.COMPACTION_UPDATE_STATS_ROW_COUNT; import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CONCURRENT_UPDATE_STATS_ROW_COUNT; @@ -35,7 +32,6 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.Set; @@ -43,12 +39,8 @@ import java.util.concurrent.Callable; import javax.annotation.concurrent.GuardedBy; -import org.apache.phoenix.index.GlobalIndexChecker; -import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellBuilderFactory; -import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; @@ -56,11 +48,7 @@ import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; @@ -86,27 +74,16 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; -import org.apache.phoenix.cache.GlobalCache; -import org.apache.phoenix.cache.TenantCache; import org.apache.phoenix.coprocessor.generated.PTableProtos; -import org.apache.phoenix.exception.DataExceedsCapacityException; import org.apache.phoenix.exception.SQLExceptionCode; -import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ExpressionType; -import org.apache.phoenix.expression.aggregator.Aggregator; -import org.apache.phoenix.expression.aggregator.Aggregators; -import org.apache.phoenix.expression.aggregator.ServerAggregators; import org.apache.phoenix.filter.AllVersionsIndexRebuildFilter; -import org.apache.phoenix.hbase.index.IndexRegionObserver; import org.apache.phoenix.hbase.index.Indexer; -import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.exception.IndexWriteException; -import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.hbase.index.util.KeyValueBuilder; +import org.apache.phoenix.index.GlobalIndexChecker; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.index.PhoenixIndexFailurePolicy; @@ -116,47 +93,26 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.mapreduce.index.IndexTool; -import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; -import org.apache.phoenix.schema.PColumn; -import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; -import org.apache.phoenix.schema.RowKeySchema; -import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableNotFoundException; -import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.stats.NoOpStatisticsCollector; import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker; import org.apache.phoenix.schema.stats.StatisticsCollector; import org.apache.phoenix.schema.stats.StatisticsCollectorFactory; import org.apache.phoenix.schema.stats.StatsCollectionDisabledOnServerException; -import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList; -import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; -import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple; -import org.apache.phoenix.schema.tuple.Tuple; -import org.apache.phoenix.schema.types.PBinary; -import org.apache.phoenix.schema.types.PChar; import org.apache.phoenix.schema.types.PDataType; -import org.apache.phoenix.schema.types.PDouble; -import org.apache.phoenix.schema.types.PFloat; import org.apache.phoenix.schema.types.PLong; -import org.apache.phoenix.transaction.PhoenixTransactionContext; -import org.apache.phoenix.transaction.PhoenixTransactionProvider; -import org.apache.phoenix.transaction.TransactionFactory; -import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.EnvironmentEdgeManager; -import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.PhoenixKeyValueUtil; -import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; @@ -165,15 +121,11 @@ import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.ServerUtil.ConnectionType; -import org.apache.phoenix.util.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.phoenix.thirdparty.com.google.common.base.Throwables; -import org.apache.phoenix.thirdparty.com.google.common.collect.Lists; -import org.apache.phoenix.thirdparty.com.google.common.collect.Sets; -import org.apache.phoenix.thirdparty.com.google.common.primitives.Ints; - +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; /** * Region observer that aggregates ungrouped rows(i.e. SQL query with aggregation function and no GROUP BY). @@ -220,7 +172,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver @GuardedBy("lock") private boolean isRegionClosingOrSplitting = false; private static final Logger LOGGER = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class); - private KeyValueBuilder kvBuilder; private Configuration upsertSelectConfig; private Configuration compactionConfig; private Configuration indexWriteConfig; @@ -233,9 +184,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver @Override public void start(CoprocessorEnvironment e) throws IOException { - // Can't use ClientKeyValueBuilder on server-side because the memstore expects to - // be able to get a single backing buffer for a KeyValue. - this.kvBuilder = GenericKeyValueBuilder.INSTANCE; /* * We need to create a copy of region's configuration since we don't want any side effect of * setting the RpcControllerFactory. @@ -248,19 +196,45 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver * priority handlers which could result in a deadlock. */ upsertSelectConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, - InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class); + InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class); compactionConfig = ServerUtil.getCompactionConfig(e.getConfiguration()); // For retries of index write failures, use the same # of retries as the rebuilder indexWriteConfig = PropertiesUtil.cloneConfig(e.getConfiguration()); indexWriteConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - e.getConfiguration().getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, - QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER)); + e.getConfiguration().getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, + QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER)); indexWriteProps = new ReadOnlyProps(indexWriteConfig.iterator()); } - public void commitBatchWithRetries(final Region region, final List<Mutation> localRegionMutations, final long blockingMemstoreSize) throws IOException { + Configuration getUpsertSelectConfig() { + return upsertSelectConfig; + } + + void incrementScansReferenceCount() throws IOException { + synchronized (lock) { + if (isRegionClosingOrSplitting) { + throw new IOException("Temporarily unable to write from scan because region is closing or splitting"); + } + scansReferenceCount++; + lock.notifyAll(); + } + } + + void decrementScansReferenceCount() { + synchronized (lock) { + scansReferenceCount--; + if (scansReferenceCount < 0) { + LOGGER.warn( + "Scan reference count went below zero. Something isn't correct. Resetting it back to zero"); + scansReferenceCount = 0; + } + lock.notifyAll(); + } + } + + void commitBatchWithRetries(final Region region, final List<Mutation> localRegionMutations, final long blockingMemstoreSize) throws IOException { try { commitBatch(region, localRegionMutations, blockingMemstoreSize); } catch (IOException e) { @@ -278,10 +252,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } - private void commitBatch(Region region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException { - if (mutations.isEmpty()) { - return; - } + void commitBatch(Region region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException { + if (mutations.isEmpty()) { + return; + } Mutation[] mutationArray = new Mutation[mutations.size()]; // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the @@ -300,9 +274,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver region.batchMutate(mutations.toArray(mutationArray)); } - private void setIndexAndTransactionProperties(List<Mutation> mutations, byte[] indexUUID, - byte[] indexMaintainersPtr, byte[] txState, - byte[] clientVersionBytes, boolean useIndexProto) { + static void setIndexAndTransactionProperties(List<Mutation> mutations, byte[] indexUUID, + byte[] indexMaintainersPtr, byte[] txState, + byte[] clientVersionBytes, boolean useIndexProto) { for (Mutation m : mutations) { if (indexMaintainersPtr != null) { m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr); @@ -320,9 +294,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } private void commitBatchWithTable(Table table, List<Mutation> mutations) throws IOException { - if (mutations.isEmpty()) { - return; - } + if (mutations.isEmpty()) { + return; + } LOGGER.debug("Committing batch of " + mutations.size() + " mutations for " + table); try { @@ -340,9 +314,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver * a high chance that flush might not proceed and memstore won't be freed up. * @throws IOException */ - public void checkForRegionClosingOrSplitting() throws IOException { + void checkForRegionClosingOrSplitting() throws IOException { synchronized (lock) { - if(isRegionClosingOrSplitting) { + if (isRegionClosingOrSplitting) { lock.notifyAll(); throw new IOException("Region is getting closed. Not allowing to write to avoid possible deadlock."); } @@ -369,14 +343,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver public static class MutationList extends ArrayList<Mutation> { private long byteSize = 0L; + public MutationList() { super(); } - - public MutationList(int size){ + + public MutationList(int size) { super(size); } - + @Override public boolean add(Mutation e) { boolean r = super.add(e); @@ -392,24 +367,25 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver @Override public void clear() { - byteSize = 0L; + byteSize = 0l; super.clear(); } } - public static long getBlockingMemstoreSize(Region region, Configuration conf) { - long flushSize = region.getTableDescriptor().getMemStoreFlushSize(); - - if (flushSize <= 0) { - flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, - TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE); - } - return flushSize * (conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, - HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1); - } - + static long getBlockingMemstoreSize(Region region, Configuration conf) { + long flushSize = region.getTableDescriptor().getMemStoreFlushSize(); + + if (flushSize <= 0) { + flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, + TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE); + } + return flushSize * (conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, + HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER) - 1); + } + @Override - protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException, SQLException { + protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, + final RegionScanner s) throws IOException, SQLException { final RegionCoprocessorEnvironment env = c.getEnvironment(); final Region region = env.getRegion(); long ts = scan.getTimeRange().getMax(); @@ -436,39 +412,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } }); } - - PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan); int offsetToBe = 0; if (localIndexScan) { - /* - * For local indexes, we need to set an offset on row key expressions to skip - * the region start key. - */ offsetToBe = region.getRegionInfo().getStartKey().length != 0 ? region.getRegionInfo().getStartKey().length : - region.getRegionInfo().getEndKey().length; - ScanUtil.setRowKeyOffset(scan, offsetToBe); + region.getRegionInfo().getEndKey().length; } final int offset = offsetToBe; - - PTable projectedTable = null; - PTable writeToTable = null; - byte[][] values = null; byte[] descRowKeyTableBytes = scan.getAttribute(UPGRADE_DESC_ROW_KEY); boolean isDescRowKeyOrderUpgrade = descRowKeyTableBytes != null; - if (isDescRowKeyOrderUpgrade) { - LOGGER.debug("Upgrading row key for " + region.getRegionInfo().getTable().getNameAsString()); - projectedTable = deserializeTable(descRowKeyTableBytes); - try { - writeToTable = PTableImpl.builderWithColumns(projectedTable, - getColumnsToClone(projectedTable)) - .setRowKeyOrderOptimizable(true) - .build(); - } catch (SQLException e) { - ServerUtil.throwIOException("Upgrade failed", e); // Impossible - } - values = new byte[projectedTable.getPKColumns().size()][]; - } boolean useProto = false; byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO); useProto = localIndexBytes != null; @@ -476,47 +428,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD); } List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto); - MutationList indexMutations = localIndexBytes == null ? new MutationList() : new MutationList(1024); - RegionScanner theScanner = s; - - byte[] replayMutations = scan.getAttribute(BaseScannerRegionObserver.REPLAY_WRITES); - byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID); - byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE); - byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION); - PhoenixTransactionProvider txnProvider = null; - if (txState != null) { - int clientVersion = clientVersionBytes == null ? ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes); - txnProvider = TransactionFactory.getTransactionProvider(txState, clientVersion); - } - List<Expression> selectExpressions = null; byte[] upsertSelectTable = scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE); - boolean isUpsert = false; boolean isDelete = false; - byte[] deleteCQ = null; - byte[] deleteCF = null; - byte[] emptyCF = null; - Connection targetHConn = null; - Table targetHTable = null; - boolean isPKChanging = false; - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - if (upsertSelectTable != null) { - isUpsert = true; - projectedTable = deserializeTable(upsertSelectTable); - targetHConn = ConnectionFactory.createConnection(upsertSelectConfig); - targetHTable = targetHConn.getTable( - TableName.valueOf(projectedTable.getPhysicalName().getBytes())); - selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS)); - values = new byte[projectedTable.getPKColumns().size()][]; - isPKChanging = ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions); - } else { + if (upsertSelectTable == null) { byte[] isDeleteAgg = scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG); isDelete = isDeleteAgg != null && Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0; - if (!isDelete) { - deleteCF = scan.getAttribute(BaseScannerRegionObserver.DELETE_CF); - deleteCQ = scan.getAttribute(BaseScannerRegionObserver.DELETE_CQ); - } - emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF); } TupleProjector tupleProjector = null; byte[][] viewConstants = null; @@ -531,422 +448,18 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); theScanner = - getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector, - region, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex); - } - - if (j != null) { - theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), env, useQualifierAsIndex, useNewValueColumnQualifier); + getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector, + region, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex); } - - int maxBatchSize = 0; - long maxBatchSizeBytes = 0L; - MutationList mutations = new MutationList(); - boolean needToWrite = false; - Configuration conf = env.getConfiguration(); - - /** - * Slow down the writes if the memstore size more than - * (hbase.hregion.memstore.block.multiplier - 1) times hbase.hregion.memstore.flush.size - * bytes. This avoids flush storm to hdfs for cases like index building where reads and - * write happen to all the table regions in the server. - */ - final long blockingMemStoreSize = getBlockingMemstoreSize(region, conf) ; - boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan; - if(buildLocalIndex) { - checkForLocalIndexColumnFamilies(region, indexMaintainers); - } - if (isDescRowKeyOrderUpgrade || isDelete || isUpsert - || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) { - needToWrite = true; - if((isUpsert && (targetHTable == null || - !targetHTable.getName().equals(region.getTableDescriptor().getTableName())))) { - needToWrite = false; - } - maxBatchSize = conf.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); - mutations = new MutationList(Ints.saturatedCast(maxBatchSize + maxBatchSize / 10)); - maxBatchSizeBytes = conf.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB, - QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES); - } - boolean hasMore; - int rowCount = 0; - boolean hasAny = false; - boolean acquiredLock = false; - boolean incrScanRefCount = false; - Aggregators aggregators = null; - Aggregator[] rowAggregators = null; - final RegionScanner innerScanner = theScanner; - final TenantCache tenantCache = GlobalCache.getTenantCache(env, ScanUtil.getTenantId(scan)); - try (MemoryChunk em = tenantCache.getMemoryManager().allocate(0)) { - aggregators = ServerAggregators.deserialize( - scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), conf, em); - rowAggregators = aggregators.getAggregators(); - Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan); - Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan))); - } - boolean useIndexProto = true; - byte[] indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD); - // for backward compatiblity fall back to look by the old attribute - if (indexMaintainersPtr == null) { - indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_MD); - useIndexProto = false; - } - - if(needToWrite) { - synchronized (lock) { - if (isRegionClosingOrSplitting) { - throw new IOException("Temporarily unable to write from scan because region is closing or splitting"); - } - scansReferenceCount++; - incrScanRefCount = true; - lock.notifyAll(); - } - } - region.startRegionOperation(); - acquiredLock = true; - synchronized (innerScanner) { - do { - List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>(); - // Results are potentially returned even when the return value of s.next is false - // since this is an indication of whether or not there are more values after the - // ones returned - hasMore = innerScanner.nextRaw(results); - if (!results.isEmpty()) { - rowCount++; - result.setKeyValues(results); - if (isDescRowKeyOrderUpgrade) { - Arrays.fill(values, null); - Cell firstKV = results.get(0); - RowKeySchema schema = projectedTable.getRowKeySchema(); - int maxOffset = schema.iterator(firstKV.getRowArray(), firstKV.getRowOffset() + offset, firstKV.getRowLength(), ptr); - for (int i = 0; i < schema.getFieldCount(); i++) { - Boolean hasValue = schema.next(ptr, i, maxOffset); - if (hasValue == null) { - break; - } - Field field = schema.getField(i); - if (field.getSortOrder() == SortOrder.DESC) { - // Special case for re-writing DESC ARRAY, as the actual byte value needs to change in this case - if (field.getDataType().isArrayType()) { - field.getDataType().coerceBytes(ptr, null, field.getDataType(), - field.getMaxLength(), field.getScale(), field.getSortOrder(), - field.getMaxLength(), field.getScale(), field.getSortOrder(), true); // force to use correct separator byte - } - // Special case for re-writing DESC CHAR or DESC BINARY, to force the re-writing of trailing space characters - else if (field.getDataType() == PChar.INSTANCE || field.getDataType() == PBinary.INSTANCE) { - int len = ptr.getLength(); - while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] == StringUtil.SPACE_UTF8) { - len--; - } - ptr.set(ptr.get(), ptr.getOffset(), len); - // Special case for re-writing DESC FLOAT and DOUBLE, as they're not inverted like they should be (PHOENIX-2171) - } else if (field.getDataType() == PFloat.INSTANCE || field.getDataType() == PDouble.INSTANCE) { - byte[] invertedBytes = SortOrder.invert(ptr.get(), ptr.getOffset(), ptr.getLength()); - ptr.set(invertedBytes); - } - } else if (field.getDataType() == PBinary.INSTANCE) { - // Remove trailing space characters so that the setValues call below will replace them - // with the correct zero byte character. Note this is somewhat dangerous as these - // could be legit, but I don't know what the alternative is. - int len = ptr.getLength(); - while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] == StringUtil.SPACE_UTF8) { - len--; - } - ptr.set(ptr.get(), ptr.getOffset(), len); - } - values[i] = ptr.copyBytes(); - } - writeToTable.newKey(ptr, values); - if (Bytes.compareTo( - firstKV.getRowArray(), firstKV.getRowOffset() + offset, firstKV.getRowLength(), - ptr.get(),ptr.getOffset() + offset,ptr.getLength()) == 0) { - continue; - } - byte[] newRow = ByteUtil.copyKeyBytesIfNecessary(ptr); - if (offset > 0) { // for local indexes (prepend region start key) - byte[] newRowWithOffset = new byte[offset + newRow.length]; - System.arraycopy(firstKV.getRowArray(), firstKV.getRowOffset(), newRowWithOffset, 0, offset); - System.arraycopy(newRow, 0, newRowWithOffset, offset, newRow.length); - newRow = newRowWithOffset; - } - byte[] oldRow = Bytes.copy(firstKV.getRowArray(), firstKV.getRowOffset(), firstKV.getRowLength()); - for (Cell cell : results) { - // Copy existing cell but with new row key - Cell newCell = - CellBuilderFactory.create(CellBuilderType.DEEP_COPY). - setRow(newRow). - setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()). - setQualifier(cell.getQualifierArray(), - cell.getQualifierOffset(), cell.getQualifierLength()). - setTimestamp(cell.getTimestamp()). - setType(cell.getType()).setValue(cell.getValueArray(), - cell.getValueOffset(), cell.getValueLength()).build(); - switch (cell.getType()) { - case Put: - // If Put, point delete old Put - Delete del = new Delete(oldRow); - Cell newDelCell = - CellBuilderFactory.create(CellBuilderType.DEEP_COPY). - setRow(newRow). - setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), - cell.getFamilyLength()). - setQualifier(cell.getQualifierArray(), - cell.getQualifierOffset(), cell.getQualifierLength()). - setTimestamp(cell.getTimestamp()). - setType(Cell.Type.Delete). - setValue(ByteUtil.EMPTY_BYTE_ARRAY, - 0, 0).build(); - del.add(newDelCell); - mutations.add(del); - - Put put = new Put(newRow); - put.add(newCell); - mutations.add(put); - break; - case Delete: - case DeleteColumn: - case DeleteFamily: - case DeleteFamilyVersion: - Delete delete = new Delete(newRow); - delete.add(newCell); - mutations.add(delete); - break; - } - } - } else if (buildLocalIndex) { - for (IndexMaintainer maintainer : indexMaintainers) { - if (!results.isEmpty()) { - result.getKey(ptr); - ValueGetter valueGetter = - maintainer.createGetterFromKeyValues( - ImmutableBytesPtr.copyBytesIfNecessary(ptr), - results); - Put put = maintainer.buildUpdateMutation(kvBuilder, - valueGetter, ptr, results.get(0).getTimestamp(), - env.getRegion().getRegionInfo().getStartKey(), - env.getRegion().getRegionInfo().getEndKey()); - - if (txnProvider != null) { - put = txnProvider.markPutAsCommitted(put, ts, ts); - } - indexMutations.add(put); - } - } - result.setKeyValues(results); - } else if (isDelete) { - // FIXME: the version of the Delete constructor without the lock - // args was introduced in 0.94.4, thus if we try to use it here - // we can no longer use the 0.94.2 version of the client. - Cell firstKV = results.get(0); - Delete delete = new Delete(firstKV.getRowArray(), - firstKV.getRowOffset(), firstKV.getRowLength(),ts); - if (replayMutations != null) { - delete.setAttribute(REPLAY_WRITES, replayMutations); - } - mutations.add(delete); - // force tephra to ignore this deletes - delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); - } else if (isUpsert) { - Arrays.fill(values, null); - int bucketNumOffset = 0; - if (projectedTable.getBucketNum() != null) { - values[0] = new byte[] { 0 }; - bucketNumOffset = 1; - } - int i = bucketNumOffset; - List<PColumn> projectedColumns = projectedTable.getColumns(); - for (; i < projectedTable.getPKColumns().size(); i++) { - Expression expression = selectExpressions.get(i - bucketNumOffset); - if (expression.evaluate(result, ptr)) { - values[i] = ptr.copyBytes(); - // If SortOrder from expression in SELECT doesn't match the - // column being projected into then invert the bits. - if (expression.getSortOrder() != - projectedColumns.get(i).getSortOrder()) { - SortOrder.invert(values[i], 0, values[i], 0, - values[i].length); - } - }else{ - values[i] = ByteUtil.EMPTY_BYTE_ARRAY; - } - } - projectedTable.newKey(ptr, values); - PRow row = projectedTable.newRow(kvBuilder, ts, ptr, false); - for (; i < projectedColumns.size(); i++) { - Expression expression = selectExpressions.get(i - bucketNumOffset); - if (expression.evaluate(result, ptr)) { - PColumn column = projectedColumns.get(i); - if (!column.getDataType().isSizeCompatible(ptr, null, - expression.getDataType(), expression.getSortOrder(), - expression.getMaxLength(), expression.getScale(), - column.getMaxLength(), column.getScale())) { - throw new DataExceedsCapacityException( - column.getDataType(), - column.getMaxLength(), - column.getScale(), - column.getName().getString()); - } - column.getDataType().coerceBytes(ptr, null, - expression.getDataType(), expression.getMaxLength(), - expression.getScale(), expression.getSortOrder(), - column.getMaxLength(), column.getScale(), - column.getSortOrder(), projectedTable.rowKeyOrderOptimizable()); - byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr); - row.setValue(column, bytes); - } - } - for (Mutation mutation : row.toRowMutations()) { - if (replayMutations != null) { - mutation.setAttribute(REPLAY_WRITES, replayMutations); - } else if (txnProvider != null && projectedTable.getType() == PTableType.INDEX) { - mutation = txnProvider.markPutAsCommitted((Put)mutation, ts, ts); - } - mutations.add(mutation); - } - for (i = 0; i < selectExpressions.size(); i++) { - selectExpressions.get(i).reset(); - } - } else if (deleteCF != null && deleteCQ != null) { - // No need to search for delete column, since we project only it - // if no empty key value is being set - if (emptyCF == null || - result.getValue(deleteCF, deleteCQ) != null) { - Delete delete = new Delete(results.get(0).getRowArray(), - results.get(0).getRowOffset(), - results.get(0).getRowLength()); - delete.addColumns(deleteCF, deleteCQ, ts); - // force tephra to ignore this deletes - delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); - mutations.add(delete); - } - } - if (emptyCF != null) { - /* - * If we've specified an emptyCF, then we need to insert an empty - * key value "retroactively" for any key value that is visible at - * the timestamp that the DDL was issued. Key values that are not - * visible at this timestamp will not ever be projected up to - * scans past this timestamp, so don't need to be considered. - * We insert one empty key value per row per timestamp. - */ - Set<Long> timeStamps = - Sets.newHashSetWithExpectedSize(results.size()); - for (Cell kv : results) { - long kvts = kv.getTimestamp(); - if (!timeStamps.contains(kvts)) { - Put put = new Put(kv.getRowArray(), kv.getRowOffset(), - kv.getRowLength()); - put.addColumn(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts, - ByteUtil.EMPTY_BYTE_ARRAY); - mutations.add(put); - } - } - } - if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { - commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, - txState, targetHTable, useIndexProto, isPKChanging, clientVersionBytes); - mutations.clear(); - } - // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config - - if (ServerUtil.readyToCommit(indexMutations.size(), indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { - setIndexAndTransactionProperties(indexMutations, indexUUID, indexMaintainersPtr, txState, clientVersionBytes, useIndexProto); - commitBatch(region, indexMutations, blockingMemStoreSize); - indexMutations.clear(); - } - aggregators.aggregate(rowAggregators, result); - hasAny = true; - } - } while (hasMore); - if (!mutations.isEmpty()) { - commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState, - targetHTable, useIndexProto, isPKChanging, clientVersionBytes); - mutations.clear(); - } - - if (!indexMutations.isEmpty()) { - commitBatch(region, indexMutations, blockingMemStoreSize); - indexMutations.clear(); - } - } - } finally { - if (needToWrite && incrScanRefCount) { - synchronized (lock) { - scansReferenceCount--; - if (scansReferenceCount < 0) { - LOGGER.warn( - "Scan reference count went below zero. Something isn't correct. Resetting it back to zero"); - scansReferenceCount = 0; - } - lock.notifyAll(); - } - } - try { - if (targetHTable != null) { - try { - targetHTable.close(); - } catch (IOException e) { - LOGGER.error("Closing table: " + targetHTable + " failed: ", e); - } - } - if (targetHConn != null) { - try { - targetHConn.close(); - } catch (IOException e) { - LOGGER.error("Closing connection: " + targetHConn + " failed: ", e); - } - } - } finally { - try { - innerScanner.close(); - } finally { - if (acquiredLock) region.closeRegionOperation(); - } - } - } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(LogUtil.addCustomAnnotations("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan, ScanUtil.getCustomAnnotations(scan))); - } - - final boolean hadAny = hasAny; - Cell keyValue = null; - if (hadAny) { - byte[] value = aggregators.toBytes(rowAggregators); - keyValue = PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, - SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length); + if (j != null) { + theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), env, useQualifierAsIndex, useNewValueColumnQualifier); } - final Cell aggKeyValue = keyValue; - - RegionScanner scanner = new BaseRegionScanner(innerScanner) { - private boolean done = !hadAny; - - @Override - public boolean isFilterDone() { - return done; - } - - @Override - public boolean next(List<Cell> results) throws IOException { - if (done) return false; - done = true; - results.add(aggKeyValue); - return false; - } - - @Override - public long getMaxResultSize() { - return scan.getMaxResultSize(); - } - }; - return scanner; - + return new UngroupedAggregateRegionScanner(c, theScanner, region, scan, env, this); } - private void checkForLocalIndexColumnFamilies(Region region, - List<IndexMaintainer> indexMaintainers) throws IOException { + public static void checkForLocalIndexColumnFamilies(Region region, + List<IndexMaintainer> indexMaintainers) throws IOException { TableDescriptor tableDesc = region.getTableDescriptor(); String schemaName = tableDesc.getTableName().getNamespaceAsString() @@ -956,13 +469,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver String tableName = SchemaUtil.getTableNameFromFullName(tableDesc.getTableName().getNameAsString()); for (IndexMaintainer indexMaintainer : indexMaintainers) { Set<ColumnReference> coveredColumns = indexMaintainer.getCoveredColumns(); - if(coveredColumns.isEmpty()) { + if (coveredColumns.isEmpty()) { byte[] localIndexCf = indexMaintainer.getEmptyKeyValueFamily().get(); // When covered columns empty we store index data in default column family so check for it. if (tableDesc.getColumnFamily(localIndexCf) == null) { ServerUtil.throwIOException("Column Family Not Found", - new ColumnFamilyNotFoundException(schemaName, tableName, Bytes - .toString(localIndexCf))); + new ColumnFamilyNotFoundException(schemaName, tableName, Bytes + .toString(localIndexCf))); } } for (ColumnReference reference : coveredColumns) { @@ -970,21 +483,21 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver ColumnFamilyDescriptor family = region.getTableDescriptor().getColumnFamily(cf); if (family == null) { ServerUtil.throwIOException("Column Family Not Found", - new ColumnFamilyNotFoundException(schemaName, tableName, Bytes.toString(cf))); + new ColumnFamilyNotFoundException(schemaName, tableName, Bytes.toString(cf))); } } } } - private void commit(final Region region, List<Mutation> mutations, byte[] indexUUID, final long blockingMemStoreSize, - byte[] indexMaintainersPtr, byte[] txState, final Table targetHTable, boolean useIndexProto, - boolean isPKChanging, byte[] clientVersionBytes) + void commit(final Region region, List<Mutation> mutations, byte[] indexUUID, final long blockingMemStoreSize, + byte[] indexMaintainersPtr, byte[] txState, final Table targetHTable, boolean useIndexProto, + boolean isPKChanging, byte[] clientVersionBytes) throws IOException { final List<Mutation> localRegionMutations = Lists.newArrayList(); final List<Mutation> remoteRegionMutations = Lists.newArrayList(); setIndexAndTransactionProperties(mutations, indexUUID, indexMaintainersPtr, txState, clientVersionBytes, useIndexProto); separateLocalAndRemoteMutations(targetHTable, region, mutations, localRegionMutations, remoteRegionMutations, - isPKChanging); + isPKChanging); commitBatchWithRetries(region, localRegionMutations, blockingMemStoreSize); try { commitBatchWithTable(targetHTable, remoteRegionMutations); @@ -1006,7 +519,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } private void handleIndexWriteException(final List<Mutation> localRegionMutations, IOException origIOE, - MutateCommand mutateCommand) throws IOException { + MutateCommand mutateCommand) throws IOException { long serverTimestamp = ServerUtil.parseTimestampFromRemoteException(origIOE); SQLException inferredE = ServerUtil.parseLocalOrRemoteServerException(origIOE); if (inferredE != null && inferredE.getErrorCode() == SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) { @@ -1015,20 +528,20 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver for (Mutation mutation : localRegionMutations) { if (PhoenixIndexMetaData.isIndexRebuild(mutation.getAttributesMap())) { mutation.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, - BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES); + BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES); } else { mutation.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, - BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES); + BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES); } // use the server timestamp for index write retrys PhoenixKeyValueUtil.setTimestamp(mutation, serverTimestamp); } IndexWriteException iwe = PhoenixIndexFailurePolicy.getIndexWriteException(inferredE); try (PhoenixConnection conn = - QueryUtil.getConnectionOnServer(indexWriteConfig) - .unwrap(PhoenixConnection.class)) { + QueryUtil.getConnectionOnServer(indexWriteConfig) + .unwrap(PhoenixConnection.class)) { PhoenixIndexFailurePolicy.doBatchWithRetries(mutateCommand, iwe, conn, - indexWriteProps); + indexWriteProps); } catch (Exception e) { throw new DoNotRetryIOException(e); } @@ -1039,14 +552,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver private void separateLocalAndRemoteMutations(Table targetHTable, Region region, List<Mutation> mutations, List<Mutation> localRegionMutations, List<Mutation> remoteRegionMutations, - boolean isPKChanging){ + boolean isPKChanging) { boolean areMutationsInSameTable = areMutationsInSameTable(targetHTable, region); //if we're writing to the same table, but the PK can change, that means that some //mutations might be in our current region, and others in a different one. if (areMutationsInSameTable && isPKChanging) { RegionInfo regionInfo = region.getRegionInfo(); - for (Mutation mutation : mutations){ - if (regionInfo.containsRow(mutation.getRow())){ + for (Mutation mutation : mutations) { + if (regionInfo.containsRow(mutation.getRow())) { localRegionMutations.add(mutation); } else { remoteRegionMutations.add(mutation); @@ -1066,9 +579,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver @Override public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, - InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, - CompactionRequest request) - throws IOException { + InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) { final TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable(); // Compaction and split upcalls run with the effective user context of the requesting user. @@ -1076,7 +588,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver // the login user. Switch to the login user context to ensure we have the expected // security context. return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() { - @Override public InternalScanner run() throws Exception { + @Override + public InternalScanner run() throws Exception { InternalScanner internalScanner = scanner; try { long clientTimeStamp = EnvironmentEdgeManager.currentTimeMillis(); @@ -1084,8 +597,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver new DelegateRegionCoprocessorEnvironment(c.getEnvironment(), ConnectionType.COMPACTION_CONNECTION); StatisticsCollector statisticsCollector = StatisticsCollectorFactory.createStatisticsCollector( - compactionConfEnv, table.getNameAsString(), clientTimeStamp, - store.getColumnFamilyDescriptor().getName()); + compactionConfEnv, table.getNameAsString(), clientTimeStamp, + store.getColumnFamilyDescriptor().getName()); statisticsCollector.init(); internalScanner = statisticsCollector.createCompactionScanner(compactionConfEnv, store, internalScanner); } catch (Exception e) { @@ -1102,7 +615,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver return scanner; } - private static PTable deserializeTable(byte[] b) { + static PTable deserializeTable(byte[] b) { try { PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b); return PTableImpl.createFromProto(ptableProto); @@ -1119,19 +632,19 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } else { if (region.getTableDescriptor().hasCoprocessor(GlobalIndexChecker.class.getCanonicalName())) { return new IndexRepairRegionScanner(innerScanner, region, scan, env, this); - } else { + } else { return new IndexRebuildRegionScanner(innerScanner, region, scan, env, this); } } - } + private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan, final RegionCoprocessorEnvironment env) throws IOException { boolean oldCoproc = region.getTableDescriptor().hasCoprocessor(Indexer.class.getCanonicalName()); byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE); IndexTool.IndexVerifyType verifyType = (valueBytes != null) ? IndexTool.IndexVerifyType.fromValue(valueBytes) : IndexTool.IndexVerifyType.NONE; - if (oldCoproc && verifyType == IndexTool.IndexVerifyType.ONLY) { + if (oldCoproc && verifyType == IndexTool.IndexVerifyType.ONLY) { return new IndexerRegionScanner(innerScanner, region, scan, env, this); } if (!scan.isRaw()) { @@ -1158,9 +671,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } return getRegionScanner(innerScanner, region, scan, env, oldCoproc); } - + private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats, - final Region region, final Scan scan, Configuration config) throws IOException { + final Region region, final Scan scan, Configuration config) throws IOException { StatsCollectionCallable callable = new StatsCollectionCallable(stats, region, innerScanner, config, scan); byte[] asyncBytes = scan.getAttribute(BaseScannerRegionObserver.RUN_UPDATE_STATS_ASYNC_ATTRIB); @@ -1171,7 +684,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver long rowCount = 0; // in case of async, we report 0 as number of rows updated StatisticsCollectionRunTracker statsRunTracker = StatisticsCollectionRunTracker.getInstance(config); - final boolean runUpdateStats = statsRunTracker.addUpdateStatsCommandRegion(region.getRegionInfo(),scan.getFamilyMap().keySet()); + final boolean runUpdateStats = statsRunTracker.addUpdateStatsCommandRegion(region.getRegionInfo(), scan.getFamilyMap().keySet()); if (runUpdateStats) { if (!async) { rowCount = callable.call(); @@ -1186,7 +699,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount)); final Cell aggKeyValue = PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, - SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length); + SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length); RegionScanner scanner = new BaseRegionScanner(innerScanner) { @Override public RegionInfo getRegionInfo() { @@ -1236,7 +749,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver private final Scan scan; StatsCollectionCallable(StatisticsCollector s, Region r, RegionScanner rs, - Configuration config, Scan scan) { + Configuration config, Scan scan) { this.statsCollector = s; this.region = r; this.innerScanner = rs; @@ -1307,7 +820,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } - private static List<Expression> deserializeExpressions(byte[] b) { + static List<Expression> deserializeExpressions(byte[] b) { ByteArrayInputStream stream = new ByteArrayInputStream(b); try { DataInputStream input = new DataInputStream(stream); @@ -1362,7 +875,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver // the coprocessors to avoid dead lock scenario. See PHOENIX-3111. private void waitForScansToFinish(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException { int maxWaitTime = c.getEnvironment().getConfiguration().getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); long start = EnvironmentEdgeManager.currentTimeMillis(); synchronized (lock) { isRegionClosingOrSplitting = true; @@ -1372,10 +885,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver if (EnvironmentEdgeManager.currentTimeMillis() - start >= maxWaitTime) { isRegionClosingOrSplitting = false; // must reset in case split is not retried throw new IOException(String.format( - "Operations like local index building/delete/upsert select" - + " might be going on so not allowing to split/close. scansReferenceCount=%s region=%s", - scansReferenceCount, - c.getEnvironment().getRegionInfo().getRegionNameAsString())); + "Operations like local index building/delete/upsert select" + + " might be going on so not allowing to split/close. scansReferenceCount=%s region=%s", + scansReferenceCount, + c.getEnvironment().getRegionInfo().getRegionNameAsString())); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -1386,7 +899,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver @Override public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> c, - List<Pair<byte[], String>> familyPaths) throws IOException { + List<Pair<byte[], String>> familyPaths) throws IOException { // Don't allow bulkload if operations need read and write to same region are going on in the // the coprocessors to avoid dead lock scenario. See PHOENIX-3111. synchronized (lock) { @@ -1416,6 +929,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver // This will lead to failure of cross cluster RPC if the effective user is not // the login user. Switch to the login user context to ensure we have the expected // security context. + final String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); // since we will make a call to syscat, do nothing if we are compacting syscat itself if (request.isMajor() && !PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName)) { @@ -1424,15 +938,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver public Void run() throws Exception { // If the index is disabled, keep the deleted cells so the rebuild doesn't corrupt the index try (PhoenixConnection conn = - QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) { + QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) { PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName); List<PTable> indexes = PTableType.INDEX.equals(table.getType()) ? Lists.newArrayList(table) : table.getIndexes(); // FIXME need to handle views and indexes on views as well for (PTable index : indexes) { if (index.getIndexDisableTimestamp() != 0) { LOGGER.info( - "Modifying major compaction scanner to retain deleted cells for a table with disabled index: " - + fullTableName); + "Modifying major compaction scanner to retain deleted cells for a table with disabled index: " + + fullTableName); options.setKeepDeletedCells(KeepDeletedCells.TRUE); options.readAllVersions(); options.setTTL(Long.MAX_VALUE); @@ -1444,8 +958,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver // non-Phoenix HBase tables won't be found, do nothing } else { LOGGER.error("Unable to modify compaction scanner to retain deleted cells for a table with disabled Index; " - + fullTableName, - e); + + fullTableName, e); } } return null; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java new file mode 100644 index 0000000..844273a --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java @@ -0,0 +1,670 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.REPLAY_WRITES; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.UPGRADE_DESC_ROW_KEY; +import static org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.checkForLocalIndexColumnFamilies; +import static org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.deserializeExpressions; +import static org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.deserializeTable; +import static org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.getBlockingMemstoreSize; +import static org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.setIndexAndTransactionProperties; +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; +import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; +import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB; +import static org.apache.phoenix.query.QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS; +import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import com.google.common.collect.Sets; +import com.google.common.primitives.Ints; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.cache.GlobalCache; +import org.apache.phoenix.cache.TenantCache; +import org.apache.phoenix.exception.DataExceedsCapacityException; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.expression.aggregator.ServerAggregators; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; +import org.apache.phoenix.index.PhoenixIndexCodec; +import org.apache.phoenix.memory.InsufficientMemoryException; +import org.apache.phoenix.memory.MemoryManager; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PRow; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.RowKeySchema; +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.ValueSchema; +import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PBinary; +import org.apache.phoenix.schema.types.PChar; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PDouble; +import org.apache.phoenix.schema.types.PFloat; +import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.PhoenixTransactionProvider; +import org.apache.phoenix.transaction.TransactionFactory; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.EncodedColumnsUtil; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.hbase.index.ValueGetter; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ExpressionUtil; +import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.LogUtil; +import org.apache.phoenix.util.PhoenixKeyValueUtil; +import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.StringUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UngroupedAggregateRegionScanner extends BaseRegionScanner { + + private static final Logger LOGGER = LoggerFactory.getLogger(UngroupedAggregateRegionScanner.class); + + private long pageSizeInMs = Long.MAX_VALUE; + private int maxBatchSize = 0; + private Scan scan; + private RegionScanner innerScanner; + private Region region; + private final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver; + private final RegionCoprocessorEnvironment env; + private final boolean useQualifierAsIndex; + private boolean needToWrite = false; + private final Pair<Integer, Integer> minMaxQualifiers; + private byte[][] values = null; + private PTable.QualifierEncodingScheme encodingScheme; + private PTable writeToTable = null; + private PTable projectedTable = null; + private boolean isDescRowKeyOrderUpgrade; + private final int offset; + private boolean buildLocalIndex; + private List<IndexMaintainer> indexMaintainers; + private boolean isPKChanging = false; + private long ts; + private PhoenixTransactionProvider txnProvider = null; + private UngroupedAggregateRegionObserver.MutationList indexMutations; + private boolean isDelete = false; + private byte[] replayMutations; + private boolean isUpsert = false; + private List<Expression> selectExpressions = null; + private byte[] deleteCQ = null; + private byte[] deleteCF = null; + private byte[] emptyCF = null; + private byte[] indexUUID; + private byte[] txState; + private byte[] clientVersionBytes; + private long blockingMemStoreSize; + private long maxBatchSizeBytes = 0L; + private Table targetHTable = null; + private boolean incrScanRefCount = false; + private byte[] indexMaintainersPtr; + private boolean useIndexProto; + private Connection targetHConn = null; + + public UngroupedAggregateRegionScanner(final ObserverContext<RegionCoprocessorEnvironment> c, + final RegionScanner innerScanner, final Region region, final Scan scan, + final RegionCoprocessorEnvironment env, + final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) + throws IOException, SQLException{ + super(innerScanner); + this.env = env; + this.region = region; + this.scan = scan; + this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver; + this.innerScanner = innerScanner; + Configuration conf = env.getConfiguration(); + if (scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGING) != null) { + byte[] pageSizeFromScan = + scan.getAttribute(BaseScannerRegionObserver.AGGREGATE_PAGE_SIZE_IN_MS); + if (pageSizeFromScan != null) { + pageSizeInMs = Bytes.toLong(pageSizeFromScan); + } else { + pageSizeInMs = + conf.getLong(UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, + QueryServicesOptions.DEFAULT_UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS); + } + } + ts = scan.getTimeRange().getMax(); + boolean localIndexScan = ScanUtil.isLocalIndex(scan); + encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); + int offsetToBe = 0; + if (localIndexScan) { + /* + * For local indexes, we need to set an offset on row key expressions to skip + * the region start key. + */ + offsetToBe = region.getRegionInfo().getStartKey().length != 0 ? region.getRegionInfo().getStartKey().length : + region.getRegionInfo().getEndKey().length; + ScanUtil.setRowKeyOffset(scan, offsetToBe); + } + offset = offsetToBe; + + byte[] descRowKeyTableBytes = scan.getAttribute(UPGRADE_DESC_ROW_KEY); + isDescRowKeyOrderUpgrade = descRowKeyTableBytes != null; + if (isDescRowKeyOrderUpgrade) { + LOGGER.debug("Upgrading row key for " + region.getRegionInfo().getTable().getNameAsString()); + projectedTable = deserializeTable(descRowKeyTableBytes); + try { + writeToTable = PTableImpl.builderWithColumns(projectedTable, + getColumnsToClone(projectedTable)) + .setRowKeyOrderOptimizable(true) + .build(); + } catch (SQLException e) { + ServerUtil.throwIOException("Upgrade failed", e); // Impossible + } + values = new byte[projectedTable.getPKColumns().size()][]; + } + boolean useProto = false; + byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO); + useProto = localIndexBytes != null; + if (localIndexBytes == null) { + localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD); + } + indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto); + indexMutations = localIndexBytes == null ? new UngroupedAggregateRegionObserver.MutationList() : new UngroupedAggregateRegionObserver.MutationList(1024); + + replayMutations = scan.getAttribute(REPLAY_WRITES); + indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID); + txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE); + clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION); + if (txState != null) { + int clientVersion = clientVersionBytes == null ? ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes); + txnProvider = TransactionFactory.getTransactionProvider(txState, clientVersion); + } + byte[] upsertSelectTable = scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE); + if (upsertSelectTable != null) { + isUpsert = true; + projectedTable = deserializeTable(upsertSelectTable); + targetHConn = ConnectionFactory.createConnection(ungroupedAggregateRegionObserver.getUpsertSelectConfig()); + targetHTable = targetHConn.getTable( + TableName.valueOf(projectedTable.getPhysicalName().getBytes())); + selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS)); + values = new byte[projectedTable.getPKColumns().size()][]; + isPKChanging = ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions); + } else { + byte[] isDeleteAgg = scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG); + isDelete = isDeleteAgg != null && Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0; + if (!isDelete) { + deleteCF = scan.getAttribute(BaseScannerRegionObserver.DELETE_CF); + deleteCQ = scan.getAttribute(BaseScannerRegionObserver.DELETE_CQ); + } + emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF); + } + ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan); + useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan)); + + /** + * Slow down the writes if the memstore size more than + * (hbase.hregion.memstore.block.multiplier - 1) times hbase.hregion.memstore.flush.size + * bytes. This avoids flush storm to hdfs for cases like index building where reads and + * write happen to all the table regions in the server. + */ + blockingMemStoreSize = getBlockingMemstoreSize(region, conf) ; + + buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan; + if(buildLocalIndex) { + checkForLocalIndexColumnFamilies(region, indexMaintainers); + } + if (isDescRowKeyOrderUpgrade || isDelete || isUpsert + || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) { + needToWrite = true; + if((isUpsert && (targetHTable == null || + !targetHTable.getName().equals(region.getTableDescriptor().getTableName())))) { + needToWrite = false; + } + maxBatchSize = conf.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); + maxBatchSizeBytes = conf.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES); + } + minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " " + region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan))); + } + useIndexProto = true; + indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD); + // for backward compatiblity fall back to look by the old attribute + if (indexMaintainersPtr == null) { + indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_MD); + useIndexProto = false; + } + + if (needToWrite) { + ungroupedAggregateRegionObserver.incrementScansReferenceCount(); + incrScanRefCount = true; + } + } + + @Override + public RegionInfo getRegionInfo() { + return region.getRegionInfo(); + } + + @Override + public boolean isFilterDone() { + return false; + } + + @Override + public void close() throws IOException { + if (needToWrite && incrScanRefCount) { + ungroupedAggregateRegionObserver.decrementScansReferenceCount(); + } + try { + if (targetHTable != null) { + try { + targetHTable.close(); + } catch (IOException e) { + LOGGER.error("Closing table: " + targetHTable + " failed: ", e); + } + } + if (targetHConn != null) { + try { + targetHConn.close(); + } catch (IOException e) { + LOGGER.error("Closing connection: " + targetHConn + " failed: ", e); + } + } + } finally { + innerScanner.close(); + } + } + + boolean descRowKeyOrderUpgrade(List<Cell> results, ImmutableBytesWritable ptr, + UngroupedAggregateRegionObserver.MutationList mutations) throws IOException { + Arrays.fill(values, null); + Cell firstKV = results.get(0); + RowKeySchema schema = projectedTable.getRowKeySchema(); + int maxOffset = schema.iterator(firstKV.getRowArray(), firstKV.getRowOffset() + offset, firstKV.getRowLength(), ptr); + for (int i = 0; i < schema.getFieldCount(); i++) { + Boolean hasValue = schema.next(ptr, i, maxOffset); + if (hasValue == null) { + break; + } + ValueSchema.Field field = schema.getField(i); + if (field.getSortOrder() == SortOrder.DESC) { + // Special case for re-writing DESC ARRAY, as the actual byte value needs to change in this case + if (field.getDataType().isArrayType()) { + field.getDataType().coerceBytes(ptr, null, field.getDataType(), + field.getMaxLength(), field.getScale(), field.getSortOrder(), + field.getMaxLength(), field.getScale(), field.getSortOrder(), true); // force to use correct separator byte + } + // Special case for re-writing DESC CHAR or DESC BINARY, to force the re-writing of trailing space characters + else if (field.getDataType() == PChar.INSTANCE || field.getDataType() == PBinary.INSTANCE) { + int len = ptr.getLength(); + while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] == StringUtil.SPACE_UTF8) { + len--; + } + ptr.set(ptr.get(), ptr.getOffset(), len); + // Special case for re-writing DESC FLOAT and DOUBLE, as they're not inverted like they should be (PHOENIX-2171) + } else if (field.getDataType() == PFloat.INSTANCE || field.getDataType() == PDouble.INSTANCE) { + byte[] invertedBytes = SortOrder.invert(ptr.get(), ptr.getOffset(), ptr.getLength()); + ptr.set(invertedBytes); + } + } else if (field.getDataType() == PBinary.INSTANCE) { + // Remove trailing space characters so that the setValues call below will replace them + // with the correct zero byte character. Note this is somewhat dangerous as these + // could be legit, but I don't know what the alternative is. + int len = ptr.getLength(); + while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] == StringUtil.SPACE_UTF8) { + len--; + } + ptr.set(ptr.get(), ptr.getOffset(), len); + } + values[i] = ptr.copyBytes(); + } + writeToTable.newKey(ptr, values); + if (Bytes.compareTo( + firstKV.getRowArray(), firstKV.getRowOffset() + offset, firstKV.getRowLength(), + ptr.get(),ptr.getOffset() + offset,ptr.getLength()) == 0) { + return false; + } + byte[] newRow = ByteUtil.copyKeyBytesIfNecessary(ptr); + if (offset > 0) { // for local indexes (prepend region start key) + byte[] newRowWithOffset = new byte[offset + newRow.length]; + System.arraycopy(firstKV.getRowArray(), firstKV.getRowOffset(), newRowWithOffset, 0, offset); + System.arraycopy(newRow, 0, newRowWithOffset, offset, newRow.length); + newRow = newRowWithOffset; + } + byte[] oldRow = Bytes.copy(firstKV.getRowArray(), firstKV.getRowOffset(), firstKV.getRowLength()); + for (Cell cell : results) { + // Copy existing cell but with new row key + Cell newCell = + CellBuilderFactory.create(CellBuilderType.DEEP_COPY). + setRow(newRow). + setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()). + setQualifier(cell.getQualifierArray(), + cell.getQualifierOffset(), cell.getQualifierLength()). + setTimestamp(cell.getTimestamp()). + setType(cell.getType()).setValue(cell.getValueArray(), + cell.getValueOffset(), cell.getValueLength()).build(); + switch (cell.getType()) { + case Put: + // If Put, point delete old Put + Delete del = new Delete(oldRow); + Cell newDelCell = + CellBuilderFactory.create(CellBuilderType.DEEP_COPY). + setRow(newRow). + setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength()). + setQualifier(cell.getQualifierArray(), + cell.getQualifierOffset(), cell.getQualifierLength()). + setTimestamp(cell.getTimestamp()). + setType(Cell.Type.Delete). + setValue(ByteUtil.EMPTY_BYTE_ARRAY, + 0, 0).build(); + del.add(newDelCell); + mutations.add(del); + + Put put = new Put(newRow); + put.add(newCell); + mutations.add(put); + break; + case Delete: + case DeleteColumn: + case DeleteFamily: + case DeleteFamilyVersion: + Delete delete = new Delete(newRow); + delete.add(newCell); + mutations.add(delete); + break; + } + } + return true; + } + + void buildLocalIndex(Tuple result, List<Cell> results, ImmutableBytesWritable ptr) throws IOException { + for (IndexMaintainer maintainer : indexMaintainers) { + if (!results.isEmpty()) { + result.getKey(ptr); + ValueGetter valueGetter = + maintainer.createGetterFromKeyValues( + ImmutableBytesPtr.copyBytesIfNecessary(ptr), + results); + Put put = maintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE, + valueGetter, ptr, results.get(0).getTimestamp(), + env.getRegion().getRegionInfo().getStartKey(), + env.getRegion().getRegionInfo().getEndKey()); + + if (txnProvider != null) { + put = txnProvider.markPutAsCommitted(put, ts, ts); + } + indexMutations.add(put); + } + } + result.setKeyValues(results); + } + void deleteRow(List<Cell> results, UngroupedAggregateRegionObserver.MutationList mutations) { + // FIXME: the version of the Delete constructor without the lock + // args was introduced in 0.94.4, thus if we try to use it here + // we can no longer use the 0.94.2 version of the client. + Cell firstKV = results.get(0); + Delete delete = new Delete(firstKV.getRowArray(), + firstKV.getRowOffset(), firstKV.getRowLength(),ts); + if (replayMutations != null) { + delete.setAttribute(REPLAY_WRITES, replayMutations); + } + mutations.add(delete); + // force tephra to ignore this deletes + delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + } + + void deleteCForQ(Tuple result, List<Cell> results, UngroupedAggregateRegionObserver.MutationList mutations) { + // No need to search for delete column, since we project only it + // if no empty key value is being set + if (emptyCF == null || + result.getValue(deleteCF, deleteCQ) != null) { + Delete delete = new Delete(results.get(0).getRowArray(), + results.get(0).getRowOffset(), + results.get(0).getRowLength()); + delete.addColumns(deleteCF, deleteCQ, ts); + // force tephra to ignore this deletes + delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + mutations.add(delete); + } + } + void upsert(Tuple result, ImmutableBytesWritable ptr, UngroupedAggregateRegionObserver.MutationList mutations) { + Arrays.fill(values, null); + int bucketNumOffset = 0; + if (projectedTable.getBucketNum() != null) { + values[0] = new byte[] { 0 }; + bucketNumOffset = 1; + } + int i = bucketNumOffset; + List<PColumn> projectedColumns = projectedTable.getColumns(); + for (; i < projectedTable.getPKColumns().size(); i++) { + Expression expression = selectExpressions.get(i - bucketNumOffset); + if (expression.evaluate(result, ptr)) { + values[i] = ptr.copyBytes(); + // If SortOrder from expression in SELECT doesn't match the + // column being projected into then invert the bits. + if (expression.getSortOrder() != + projectedColumns.get(i).getSortOrder()) { + SortOrder.invert(values[i], 0, values[i], 0, + values[i].length); + } + }else{ + values[i] = ByteUtil.EMPTY_BYTE_ARRAY; + } + } + projectedTable.newKey(ptr, values); + PRow row = projectedTable.newRow(GenericKeyValueBuilder.INSTANCE, ts, ptr, false); + for (; i < projectedColumns.size(); i++) { + Expression expression = selectExpressions.get(i - bucketNumOffset); + if (expression.evaluate(result, ptr)) { + PColumn column = projectedColumns.get(i); + if (!column.getDataType().isSizeCompatible(ptr, null, + expression.getDataType(), expression.getSortOrder(), + expression.getMaxLength(), expression.getScale(), + column.getMaxLength(), column.getScale())) { + throw new DataExceedsCapacityException( + column.getDataType(), + column.getMaxLength(), + column.getScale(), + column.getName().getString()); + } + column.getDataType().coerceBytes(ptr, null, + expression.getDataType(), expression.getMaxLength(), + expression.getScale(), expression.getSortOrder(), + column.getMaxLength(), column.getScale(), + column.getSortOrder(), projectedTable.rowKeyOrderOptimizable()); + byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr); + row.setValue(column, bytes); + } + } + for (Mutation mutation : row.toRowMutations()) { + if (replayMutations != null) { + mutation.setAttribute(REPLAY_WRITES, replayMutations); + } else if (txnProvider != null && projectedTable.getType() == PTableType.INDEX) { + mutation = txnProvider.markPutAsCommitted((Put)mutation, ts, ts); + } + mutations.add(mutation); + } + for (i = 0; i < selectExpressions.size(); i++) { + selectExpressions.get(i).reset(); + } + } + + void insertEmptyKeyValue(List<Cell> results, UngroupedAggregateRegionObserver.MutationList mutations) { + Set<Long> timeStamps = + Sets.newHashSetWithExpectedSize(results.size()); + for (Cell kv : results) { + long kvts = kv.getTimestamp(); + if (!timeStamps.contains(kvts)) { + Put put = new Put(kv.getRowArray(), kv.getRowOffset(), + kv.getRowLength()); + put.addColumn(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts, + ByteUtil.EMPTY_BYTE_ARRAY); + mutations.add(put); + } + } + } + @Override + public boolean next(List<Cell> resultsToReturn) throws IOException { + boolean hasMore; + long startTime = EnvironmentEdgeManager.currentTimeMillis(); + Configuration conf = env.getConfiguration(); + final TenantCache tenantCache = GlobalCache.getTenantCache(env, ScanUtil.getTenantId(scan)); + try (MemoryManager.MemoryChunk em = tenantCache.getMemoryManager().allocate(0)) { + Aggregators aggregators = ServerAggregators.deserialize( + scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), conf, em); + Aggregator[] rowAggregators = aggregators.getAggregators(); + aggregators.reset(rowAggregators); + Cell lastCell = null; + boolean hasAny = false; + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); + UngroupedAggregateRegionObserver.MutationList mutations = new UngroupedAggregateRegionObserver.MutationList(); + if (isDescRowKeyOrderUpgrade || isDelete || isUpsert + || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) { + mutations = new UngroupedAggregateRegionObserver.MutationList(Ints.saturatedCast(maxBatchSize + maxBatchSize / 10)); + } + region.startRegionOperation(); + try { + synchronized (innerScanner) { + do { + ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting(); + List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>(); + // Results are potentially returned even when the return value of s.next is false + // since this is an indication of whether or not there are more values after the + // ones returned + hasMore = innerScanner.nextRaw(results); + if (!results.isEmpty()) { + lastCell = results.get(0); + result.setKeyValues(results); + if (isDescRowKeyOrderUpgrade) { + if (!descRowKeyOrderUpgrade(results, ptr, mutations)) { + continue; + } + } else if (buildLocalIndex) { + buildLocalIndex(result, results, ptr); + } else if (isDelete) { + deleteRow(results, mutations); + } else if (isUpsert) { + upsert(result, ptr, mutations); + } else if (deleteCF != null && deleteCQ != null) { + deleteCForQ(result, results, mutations); + } + if (emptyCF != null) { + /* + * If we've specified an emptyCF, then we need to insert an empty + * key value "retroactively" for any key value that is visible at + * the timestamp that the DDL was issued. Key values that are not + * visible at this timestamp will not ever be projected up to + * scans past this timestamp, so don't need to be considered. + * We insert one empty key value per row per timestamp. + */ + insertEmptyKeyValue(results, mutations); + } + if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { + ungroupedAggregateRegionObserver.commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, + txState, targetHTable, useIndexProto, isPKChanging, clientVersionBytes); + mutations.clear(); + } + // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config + + if (ServerUtil.readyToCommit(indexMutations.size(), indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { + setIndexAndTransactionProperties(indexMutations, indexUUID, indexMaintainersPtr, txState, clientVersionBytes, useIndexProto); + ungroupedAggregateRegionObserver.commitBatch(region, indexMutations, blockingMemStoreSize); + indexMutations.clear(); + } + aggregators.aggregate(rowAggregators, result); + hasAny = true; + } + } while (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) < pageSizeInMs); + + if (!mutations.isEmpty()) { + ungroupedAggregateRegionObserver.commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState, + targetHTable, useIndexProto, isPKChanging, clientVersionBytes); + mutations.clear(); + } + if (!indexMutations.isEmpty()) { + ungroupedAggregateRegionObserver.commitBatch(region, indexMutations, blockingMemStoreSize); + indexMutations.clear(); + } + } + } catch (InsufficientMemoryException e) { + throw new DoNotRetryIOException(e); + } catch (DataExceedsCapacityException e) { + throw new DoNotRetryIOException(e.getMessage(), e); + } catch (Throwable e) { + LOGGER.error("Exception in UngroupedAggreagteRegionScanner for region " + + region.getRegionInfo().getRegionNameAsString(), e); + throw e; + } + Cell keyValue; + if (hasAny) { + byte[] value = aggregators.toBytes(rowAggregators); + keyValue = PhoenixKeyValueUtil.newKeyValue(CellUtil.cloneRow(lastCell), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, + AGG_TIMESTAMP, value, 0, value.length); + resultsToReturn.add(keyValue); + } + return hasMore; + } finally { + region.closeRegionOperation(); + } + } + + @Override + public long getMaxResultSize() { + return scan.getMaxResultSize(); + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index 583baa8..3d7447a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -26,6 +26,7 @@ import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NO import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED; import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED; import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UNINITIALIZED; +import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; import java.io.IOException; import java.sql.SQLException; @@ -39,10 +40,12 @@ import javax.annotation.concurrent.GuardedBy; import org.apache.hadoop.hbase.client.AbstractClientScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException; import org.apache.phoenix.execute.BaseQueryPlan; import org.apache.phoenix.execute.MutationState; @@ -134,6 +137,7 @@ public class TableResultIterator implements ResultIterator { this.retry=plan.getContext().getConnection().getQueryServices().getProps() .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES); IndexUtil.setScanAttributesForIndexReadRepair(scan, table, plan.getContext().getConnection()); + scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGING, TRUE_BYTES); } @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java index 0bf5982..d19c5b2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java @@ -21,6 +21,7 @@ import static org.apache.phoenix.query.QueryConstants.*; import java.sql.SQLException; +import org.apache.phoenix.expression.aggregator.Aggregator; import org.apache.phoenix.expression.aggregator.Aggregators; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; @@ -33,21 +34,37 @@ public class UngroupedAggregatingResultIterator extends GroupedAggregatingResult public UngroupedAggregatingResultIterator( PeekingResultIterator resultIterator, Aggregators aggregators) { super(resultIterator, aggregators); } - @Override public Tuple next() throws SQLException { - Tuple result = super.next(); - // Ensure ungrouped aggregregation always returns a row, even if the underlying iterator doesn't. - if (result == null && !hasRows) { - // We should reset ClientAggregators here in case they are being reused in a new ResultIterator. - aggregators.reset(aggregators.getAggregators()); - byte[] value = aggregators.toBytes(aggregators.getAggregators()); - result = new SingleKeyValueTuple( - PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, - SINGLE_COLUMN_FAMILY, - SINGLE_COLUMN, - AGG_TIMESTAMP, - value)); + Tuple result = resultIterator.next(); + if (result == null) { + // Ensure ungrouped aggregregation always returns a row, even if the underlying iterator doesn't. + if (!hasRows) { + // We should reset ClientAggregators here in case they are being reused in a new ResultIterator. + aggregators.reset(aggregators.getAggregators()); + byte[] value = aggregators.toBytes(aggregators.getAggregators()); + result = new SingleKeyValueTuple( + PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, + SINGLE_COLUMN_FAMILY, + SINGLE_COLUMN, + AGG_TIMESTAMP, + value)); + } + } else { + Aggregator[] rowAggregators = aggregators.getAggregators(); + aggregators.reset(rowAggregators); + while (true) { + aggregators.aggregate(rowAggregators, result); + Tuple nextResult = resultIterator.peek(); + if (nextResult == null) { + break; + } + result = resultIterator.next(); + } + + byte[] value = aggregators.toBytes(rowAggregators); + Tuple tuple = wrapKeyValueAsResult(PhoenixKeyValueUtil .newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length)); + result = tuple; } hasRows = true; return result; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index e45c30e..b50af9f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -327,6 +327,8 @@ public interface QueryServices extends SQLCloseable { public static final String LONG_VIEW_INDEX_ENABLED_ATTRIB = "phoenix.index.longViewIndex.enabled"; // The number of index rows to be rebuild in one RPC call public static final String INDEX_REBUILD_PAGE_SIZE_IN_ROWS = "phoenix.index.rebuild_page_size_in_rows"; + // The number of rows to be scanned in one RPC call + public static final String UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS = "phoenix.ungrouped.aggregate_page_size_in_ms"; // Before 4.15 when we created a view we included the parent table column metadata in the view diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 0a73ca9..2122310 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -337,7 +337,8 @@ public class QueryServicesOptions { public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */ public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true; - public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1024; + public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1000; + public static final long DEFAULT_UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS = 1000; // 1 second public static final boolean DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false; diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index b2a346f..ccc72ec 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -20,6 +20,7 @@ package org.apache.phoenix.query; import static org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter.NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY; import static org.apache.phoenix.query.QueryConstants.MILLIS_IN_DAY; import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB; +import static org.apache.phoenix.query.QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB; import static org.apache.phoenix.util.PhoenixRuntime.CURRENT_SCN_ATTRIB; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR; @@ -140,7 +141,6 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver; import org.apache.phoenix.jdbc.PhoenixTestDriver; import org.apache.phoenix.schema.NewerTableAlreadyExistsException; -import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.schema.TableNotFoundException; @@ -626,6 +626,13 @@ public abstract class BaseTest { conf.setInt(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 10000); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); conf.setInt(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, 1); + conf.setInt(GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 0); + if (conf.getLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0) == 0) { + conf.setLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0); + // This results in processing one row at a time in each next operation of the aggregate region + // scanner, i.e., one row pages. In other words, 0ms page allows only one row to be processed + // within one page; 0ms page is equivalent to one-row page + } return conf; }