PHOENIX-4278 Implement pure client side transactional index maintenance (Ohad Shacham)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/47dc9359 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/47dc9359 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/47dc9359 Branch: refs/heads/4.x-HBase-1.1 Commit: 47dc935945cda24e0a507a032c3b7c2fe5b2cd43 Parents: aef828d Author: James Taylor <jtay...@salesforce.com> Authored: Mon Feb 12 12:27:10 2018 -0800 Committer: James Taylor <jtay...@salesforce.com> Committed: Mon Feb 12 15:30:00 2018 -0800 ---------------------------------------------------------------------- .../phoenix/end2end/index/BaseIndexIT.java | 2 +- .../phoenix/end2end/index/ImmutableIndexIT.java | 4 +- .../org/apache/phoenix/cache/HashCache.java | 1 + .../phoenix/cache/IndexMetaDataCache.java | 8 + .../apache/phoenix/cache/ServerCacheClient.java | 7 +- .../org/apache/phoenix/cache/TenantCache.java | 2 +- .../apache/phoenix/cache/TenantCacheImpl.java | 4 +- .../apache/phoenix/compile/DeleteCompiler.java | 3 + .../apache/phoenix/compile/UpsertCompiler.java | 2 + .../coprocessor/MetaDataRegionObserver.java | 4 +- .../coprocessor/ServerCachingEndpointImpl.java | 4 +- .../coprocessor/ServerCachingProtocol.java | 2 +- .../UngroupedAggregateRegionObserver.java | 21 +- .../generated/ServerCachingProtos.java | 117 ++++- .../apache/phoenix/execute/MutationState.java | 30 +- .../PhoenixTxnIndexMutationGenerator.java | 519 +++++++++++++++++++ .../hbase/index/covered/LocalTableState.java | 6 - .../phoenix/hbase/index/covered/TableState.java | 7 - .../index/IndexMetaDataCacheFactory.java | 7 +- .../apache/phoenix/index/PhoenixIndexCodec.java | 3 +- .../phoenix/index/PhoenixIndexMetaData.java | 12 + .../index/PhoenixTransactionalIndexer.java | 9 +- .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 1 + .../apache/phoenix/join/HashCacheFactory.java | 34 +- .../index/PhoenixIndexPartialBuildMapper.java | 5 + .../query/ConnectionQueryServicesImpl.java | 8 - .../org/apache/phoenix/schema/PTableImpl.java | 25 +- .../apache/phoenix/cache/TenantCacheTest.java | 14 +- .../src/main/ServerCachingService.proto | 1 + 29 files changed, 759 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java index b92da4a..1483c58 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java @@ -246,7 +246,7 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT { PTable table = PhoenixRuntime.getTable(conn, Bytes.toString(tableName)); assertTrue(table.getType() == PTableType.TABLE); // should be data table boolean hasIndexData = iterator.hasNext(); - assertFalse(hasIndexData); // should have no index data + assertFalse(hasIndexData && !transactional); // should have no index data } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java index e0398c7..d520824 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java @@ -75,6 +75,7 @@ import com.google.common.collect.Maps; public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT { private final boolean localIndex; + private final boolean transactional; private final String tableDDLOptions; private volatile boolean stopThreads = false; @@ -86,6 +87,7 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT { public ImmutableIndexIT(boolean localIndex, boolean transactional, boolean columnEncoded) { StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true"); this.localIndex = localIndex; + this.transactional = transactional; if (!columnEncoded) { if (optionBuilder.length()!=0) optionBuilder.append(","); @@ -250,7 +252,7 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT { Iterator<Pair<byte[], List<KeyValue>>> iterator = PhoenixRuntime.getUncommittedDataIterator(conn); assertTrue(iterator.hasNext()); iterator.next(); - assertEquals(!localIndex, iterator.hasNext()); + assertEquals((!localIndex || transactional), iterator.hasNext()); } // This test is know to flap. We need PHOENIX-2582 to be fixed before enabling this back. http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java index 311f119..764fd17 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java @@ -34,5 +34,6 @@ import org.apache.phoenix.schema.tuple.Tuple; */ @Immutable public interface HashCache extends Closeable { + public int getClientVersion(); public List<Tuple> get(ImmutableBytesPtr hashKey) throws IOException; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java index 16207c8..17e6fb6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java @@ -23,10 +23,12 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import org.apache.phoenix.hbase.index.util.VersionUtil; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.transaction.PhoenixTransactionContext; public interface IndexMetaDataCache extends Closeable { + public static int UNKNOWN_CLIENT_VERSION = VersionUtil.encodeVersion(4, 4, 0); public static final IndexMetaDataCache EMPTY_INDEX_META_DATA_CACHE = new IndexMetaDataCache() { @Override @@ -43,7 +45,13 @@ public interface IndexMetaDataCache extends Closeable { return null; } + @Override + public int getClientVersion() { + return UNKNOWN_CLIENT_VERSION; + } + }; public List<IndexMaintainer> getIndexMaintainers(); public PhoenixTransactionContext getTransactionContext(); + public int getClientVersion(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java index 28a42fa..68de747 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java @@ -43,9 +43,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; @@ -53,6 +51,7 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory; import org.apache.phoenix.coprocessor.generated.ServerCacheFactoryProtos; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheRequest; @@ -72,14 +71,11 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ScanUtil; -import com.google.common.collect.ImmutableSet; - /** * * Client for sending cache to each region server @@ -497,6 +493,7 @@ public class ServerCacheClient { svrCacheFactoryBuider.setClassName(cacheFactory.getClass().getName()); builder.setCacheFactory(svrCacheFactoryBuider.build()); builder.setTxState(ByteStringer.wrap(txState)); + builder.setClientVersion(MetaDataProtocol.PHOENIX_VERSION); instance.addServerCache(controller, builder.build(), rpcCallback); if (controller.getFailedOn() != null) { throw controller.getFailedOn(); } return rpcCallback.get(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java index d30f5dd..c4e82c2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java @@ -36,7 +36,7 @@ import org.apache.phoenix.memory.MemoryManager; public interface TenantCache { MemoryManager getMemoryManager(); Closeable getServerCache(ImmutableBytesPtr cacheId); - Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer) throws SQLException; + Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer, int clientVersion) throws SQLException; void removeServerCache(ImmutableBytesPtr cacheId); void removeAllServerCache(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java index fdf0646..1dc59bc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java @@ -105,12 +105,12 @@ public class TenantCacheImpl implements TenantCache { } @Override - public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer) throws SQLException { + public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer, int clientVersion) throws SQLException { getServerCaches().cleanUp(); MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength() + txState.length); boolean success = false; try { - Closeable element = cacheFactory.newCache(cachePtr, txState, chunk, useProtoForIndexMaintainer); + Closeable element = cacheFactory.newCache(cachePtr, txState, chunk, useProtoForIndexMaintainer, clientVersion); getServerCaches().put(cacheId, element); success = true; return element; http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 46f4542..6d5b2de 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -32,12 +32,14 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.AggregatePlan; @@ -777,6 +779,7 @@ public class DeleteCompiler { byte[] uuidValue = ServerCacheClient.generateId(); context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); context.getScan().setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get()); + context.getScan().setAttribute(PhoenixIndexCodec.CLIENT_VERSION, Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION)); context.getScan().setAttribute(BaseScannerRegionObserver.TX_STATE, txState); } ResultIterator iterator = aggPlan.iterator(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index ba202c8..9a3724e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -42,6 +42,7 @@ import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; @@ -1015,6 +1016,7 @@ public class UpsertCompiler { byte[] uuidValue = ServerCacheClient.generateId(); scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get()); + scan.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION)); scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); } ResultIterator iterator = aggPlan.iterator(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index df257b8..e46aa0f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -484,6 +485,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { conn); byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr); dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue); + dataTableScan.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION)); LOG.info("Starting to partially build indexes:" + indexesToPartiallyRebuild + " on data table:" + dataPTable.getName() + " with the earliest disable timestamp:" + earliestDisableTimestamp + " till " @@ -592,4 +594,4 @@ public class MetaDataRegionObserver extends BaseRegionObserver { return QueryUtil.getConnectionOnServerWithCustomUrl(rebuildIndexConnectionProps, REBUILD_INDEX_APPEND_TO_URL_STRING).unwrap(PhoenixConnection.class); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java index 0944fdf..448f61c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorService; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.cache.GlobalCache; +import org.apache.phoenix.cache.IndexMetaDataCache; import org.apache.phoenix.cache.TenantCache; import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheRequest; @@ -73,7 +74,8 @@ public class ServerCachingEndpointImpl extends ServerCachingService implements C (Class<ServerCacheFactory>) Class.forName(request.getCacheFactory().getClassName()); ServerCacheFactory cacheFactory = serverCacheFactoryClass.newInstance(); tenantCache.addServerCache(new ImmutableBytesPtr(request.getCacheId().toByteArray()), - cachePtr, txState, cacheFactory, request.hasHasProtoBufIndexMaintainer() && request.getHasProtoBufIndexMaintainer()); + cachePtr, txState, cacheFactory, request.hasHasProtoBufIndexMaintainer() && request.getHasProtoBufIndexMaintainer(), + request.hasClientVersion() ? request.getClientVersion() : IndexMetaDataCache.UNKNOWN_CLIENT_VERSION); } catch (Throwable e) { ProtobufUtil.setControllerException(controller, ServerUtil.createIOException("Error when adding cache: ", e)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java index 139a69c..0cc1a1f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java @@ -36,7 +36,7 @@ import org.apache.phoenix.memory.MemoryManager.MemoryChunk; */ public interface ServerCachingProtocol { public static interface ServerCacheFactory extends Writable { - public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws SQLException; + public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk, boolean useProtoForIndexMaintainer, int clientVersion) throws SQLException; } /** * Add the cache to the region server cache. http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 7692bc8..93b42bc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; @@ -262,7 +261,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE); } - private void setIndexAndTransactionProperties(List<Mutation> mutations, byte[] indexUUID, byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) { + private void setIndexAndTransactionProperties(List<Mutation> mutations, byte[] indexUUID, byte[] indexMaintainersPtr, byte[] txState, byte[] clientVersionBytes, boolean useIndexProto) { for (Mutation m : mutations) { if (indexMaintainersPtr != null) { m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr); @@ -273,6 +272,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver if (txState != null) { m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); } + if (clientVersionBytes != null) { + m.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersionBytes); + } } } @@ -511,6 +513,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_MD); useIndexProto = false; } + + byte[] clientVersionBytes = scan.getAttribute(PhoenixIndexCodec.CLIENT_VERSION); boolean acquiredLock = false; boolean incrScanRefCount = false; try { @@ -747,13 +751,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, - txState, targetHTable, useIndexProto, isPKChanging); + txState, targetHTable, useIndexProto, isPKChanging, clientVersionBytes); mutations.clear(); } // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config if (ServerUtil.readyToCommit(indexMutations.size(), indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { - setIndexAndTransactionProperties(indexMutations, indexUUID, indexMaintainersPtr, txState, useIndexProto); + setIndexAndTransactionProperties(indexMutations, indexUUID, indexMaintainersPtr, txState, clientVersionBytes, useIndexProto); commitBatch(region, indexMutations, blockingMemStoreSize); indexMutations.clear(); } @@ -763,7 +767,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } while (hasMore); if (!mutations.isEmpty()) { commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState, - targetHTable, useIndexProto, isPKChanging); + targetHTable, useIndexProto, isPKChanging, clientVersionBytes); mutations.clear(); } @@ -866,11 +870,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver private void commit(final Region region, List<Mutation> mutations, byte[] indexUUID, final long blockingMemStoreSize, byte[] indexMaintainersPtr, byte[] txState, final HTable targetHTable, boolean useIndexProto, - boolean isPKChanging) + boolean isPKChanging, byte[] clientVersionBytes) throws IOException { final List<Mutation> localRegionMutations = Lists.newArrayList(); final List<Mutation> remoteRegionMutations = Lists.newArrayList(); - setIndexAndTransactionProperties(mutations, indexUUID, indexMaintainersPtr, txState, useIndexProto); + setIndexAndTransactionProperties(mutations, indexUUID, indexMaintainersPtr, txState, clientVersionBytes, useIndexProto); separateLocalAndRemoteMutations(targetHTable, region, mutations, localRegionMutations, remoteRegionMutations, isPKChanging); try { @@ -1076,6 +1080,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver useProto = false; indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD); } + byte[] clientVersionBytes = scan.getAttribute(PhoenixIndexCodec.CLIENT_VERSION); boolean hasMore; int rowCount = 0; try { @@ -1100,6 +1105,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver put.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData); put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); put.setAttribute(REPLAY_WRITES, REPLAY_ONLY_INDEX_WRITES); + put.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersionBytes); mutations.add(put); // Since we're replaying existing mutations, it makes no sense to write them to the wal put.setDurability(Durability.SKIP_WAL); @@ -1111,6 +1117,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver del.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData); del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); del.setAttribute(REPLAY_WRITES, REPLAY_ONLY_INDEX_WRITES); + del.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersionBytes); mutations.add(del); // Since we're replaying existing mutations, it makes no sense to write them to the wal del.setDurability(Durability.SKIP_WAL); http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java index 3b8984a..f1b03f8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java @@ -5660,6 +5660,16 @@ public final class ServerCachingProtos { * <code>optional bool hasProtoBufIndexMaintainer = 6;</code> */ boolean getHasProtoBufIndexMaintainer(); + + // optional int32 clientVersion = 7; + /** + * <code>optional int32 clientVersion = 7;</code> + */ + boolean hasClientVersion(); + /** + * <code>optional int32 clientVersion = 7;</code> + */ + int getClientVersion(); } /** * Protobuf type {@code AddServerCacheRequest} @@ -5758,6 +5768,11 @@ public final class ServerCachingProtos { hasProtoBufIndexMaintainer_ = input.readBool(); break; } + case 56: { + bitField0_ |= 0x00000040; + clientVersion_ = input.readInt32(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -5906,6 +5921,22 @@ public final class ServerCachingProtos { return hasProtoBufIndexMaintainer_; } + // optional int32 clientVersion = 7; + public static final int CLIENTVERSION_FIELD_NUMBER = 7; + private int clientVersion_; + /** + * <code>optional int32 clientVersion = 7;</code> + */ + public boolean hasClientVersion() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * <code>optional int32 clientVersion = 7;</code> + */ + public int getClientVersion() { + return clientVersion_; + } + private void initFields() { tenantId_ = com.google.protobuf.ByteString.EMPTY; cacheId_ = com.google.protobuf.ByteString.EMPTY; @@ -5913,6 +5944,7 @@ public final class ServerCachingProtos { cacheFactory_ = org.apache.phoenix.coprocessor.generated.ServerCacheFactoryProtos.ServerCacheFactory.getDefaultInstance(); txState_ = com.google.protobuf.ByteString.EMPTY; hasProtoBufIndexMaintainer_ = false; + clientVersion_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -5964,6 +5996,9 @@ public final class ServerCachingProtos { if (((bitField0_ & 0x00000020) == 0x00000020)) { output.writeBool(6, hasProtoBufIndexMaintainer_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + output.writeInt32(7, clientVersion_); + } getUnknownFields().writeTo(output); } @@ -5997,6 +6032,10 @@ public final class ServerCachingProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(6, hasProtoBufIndexMaintainer_); } + if (((bitField0_ & 0x00000040) == 0x00000040)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(7, clientVersion_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -6050,6 +6089,11 @@ public final class ServerCachingProtos { result = result && (getHasProtoBufIndexMaintainer() == other.getHasProtoBufIndexMaintainer()); } + result = result && (hasClientVersion() == other.hasClientVersion()); + if (hasClientVersion()) { + result = result && (getClientVersion() + == other.getClientVersion()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -6087,6 +6131,10 @@ public final class ServerCachingProtos { hash = (37 * hash) + HASPROTOBUFINDEXMAINTAINER_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getHasProtoBufIndexMaintainer()); } + if (hasClientVersion()) { + hash = (37 * hash) + CLIENTVERSION_FIELD_NUMBER; + hash = (53 * hash) + getClientVersion(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -6218,6 +6266,8 @@ public final class ServerCachingProtos { bitField0_ = (bitField0_ & ~0x00000010); hasProtoBufIndexMaintainer_ = false; bitField0_ = (bitField0_ & ~0x00000020); + clientVersion_ = 0; + bitField0_ = (bitField0_ & ~0x00000040); return this; } @@ -6278,6 +6328,10 @@ public final class ServerCachingProtos { to_bitField0_ |= 0x00000020; } result.hasProtoBufIndexMaintainer_ = hasProtoBufIndexMaintainer_; + if (((from_bitField0_ & 0x00000040) == 0x00000040)) { + to_bitField0_ |= 0x00000040; + } + result.clientVersion_ = clientVersion_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -6312,6 +6366,9 @@ public final class ServerCachingProtos { if (other.hasHasProtoBufIndexMaintainer()) { setHasProtoBufIndexMaintainer(other.getHasProtoBufIndexMaintainer()); } + if (other.hasClientVersion()) { + setClientVersion(other.getClientVersion()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -6734,6 +6791,39 @@ public final class ServerCachingProtos { return this; } + // optional int32 clientVersion = 7; + private int clientVersion_ ; + /** + * <code>optional int32 clientVersion = 7;</code> + */ + public boolean hasClientVersion() { + return ((bitField0_ & 0x00000040) == 0x00000040); + } + /** + * <code>optional int32 clientVersion = 7;</code> + */ + public int getClientVersion() { + return clientVersion_; + } + /** + * <code>optional int32 clientVersion = 7;</code> + */ + public Builder setClientVersion(int value) { + bitField0_ |= 0x00000040; + clientVersion_ = value; + onChanged(); + return this; + } + /** + * <code>optional int32 clientVersion = 7;</code> + */ + public Builder clearClientVersion() { + bitField0_ = (bitField0_ & ~0x00000040); + clientVersion_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:AddServerCacheRequest) } @@ -8542,22 +8632,23 @@ public final class ServerCachingProtos { "ed\030\020 \002(\010\022\033\n\023indexRowKeyByteSize\030\021 \002(\005\022\021\n" + "\timmutable\030\022 \002(\010\022&\n\021indexedColumnInfo\030\023 " + "\003(\0132\013.ColumnInfo\022\026\n\016encodingScheme\030\024 \002(\005" + - "\022\036\n\026immutableStorageScheme\030\025 \002(\005\"\305\001\n\025Add" + + "\022\036\n\026immutableStorageScheme\030\025 \002(\005\"\334\001\n\025Add" + "ServerCacheRequest\022\020\n\010tenantId\030\001 \001(\014\022\017\n\007" + "cacheId\030\002 \002(\014\022)\n\010cachePtr\030\003 \002(\0132\027.Immuta" + "bleBytesWritable\022)\n\014cacheFactory\030\004 \002(\0132\023" + ".ServerCacheFactory\022\017\n\007txState\030\005 \001(\014\022\"\n\032" + - "hasProtoBufIndexMaintainer\030\006 \001(\010\"(\n\026AddS" + - "erverCacheResponse\022\016\n\006return\030\001 \002(\010\"=\n\030Re", - "moveServerCacheRequest\022\020\n\010tenantId\030\001 \001(\014" + - "\022\017\n\007cacheId\030\002 \002(\014\"+\n\031RemoveServerCacheRe" + - "sponse\022\016\n\006return\030\001 \002(\0102\245\001\n\024ServerCaching" + - "Service\022A\n\016addServerCache\022\026.AddServerCac" + - "heRequest\032\027.AddServerCacheResponse\022J\n\021re" + - "moveServerCache\022\031.RemoveServerCacheReque" + - "st\032\032.RemoveServerCacheResponseBG\n(org.ap" + - "ache.phoenix.coprocessor.generatedB\023Serv" + - "erCachingProtosH\001\210\001\001\240\001\001" + "hasProtoBufIndexMaintainer\030\006 \001(\010\022\025\n\rclie" + + "ntVersion\030\007 \001(\005\"(\n\026AddServerCacheRespons", + "e\022\016\n\006return\030\001 \002(\010\"=\n\030RemoveServerCacheRe" + + "quest\022\020\n\010tenantId\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014" + + "\"+\n\031RemoveServerCacheResponse\022\016\n\006return\030" + + "\001 \002(\0102\245\001\n\024ServerCachingService\022A\n\016addSer" + + "verCache\022\026.AddServerCacheRequest\032\027.AddSe" + + "rverCacheResponse\022J\n\021removeServerCache\022\031" + + ".RemoveServerCacheRequest\032\032.RemoveServer" + + "CacheResponseBG\n(org.apache.phoenix.copr" + + "ocessor.generatedB\023ServerCachingProtosH\001" + + "\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -8593,7 +8684,7 @@ public final class ServerCachingProtos { internal_static_AddServerCacheRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_AddServerCacheRequest_descriptor, - new java.lang.String[] { "TenantId", "CacheId", "CachePtr", "CacheFactory", "TxState", "HasProtoBufIndexMaintainer", }); + new java.lang.String[] { "TenantId", "CacheId", "CachePtr", "CacheFactory", "TxState", "HasProtoBufIndexMaintainer", "ClientVersion", }); internal_static_AddServerCacheResponse_descriptor = getDescriptor().getMessageTypes().get(5); internal_static_AddServerCacheResponse_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 3598a1a..6bc566e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -50,6 +50,7 @@ import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.MutationPlan; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; @@ -137,6 +138,7 @@ public class MutationState implements SQLCloseable { private Map<TableRef, MultiRowMutationState> txMutations = Collections.emptyMap(); final PhoenixTransactionContext phoenixTransactionContext; + final PhoenixTxnIndexMutationGenerator phoenixTxnIndexMutationGenerator; private final MutationMetricQueue mutationMetricQueue; private ReadMetricQueue readMetricQueue; @@ -190,6 +192,8 @@ public class MutationState implements SQLCloseable { // as it is not thread safe, so we use the tx member variable phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(txContext, connection, subTask); } + + phoenixTxnIndexMutationGenerator = new PhoenixTxnIndexMutationGenerator(connection, phoenixTransactionContext); } public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException { @@ -491,14 +495,14 @@ public class MutationState implements SQLCloseable { } private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final TableRef tableRef, final MultiRowMutationState values, - final long mutationTimestamp, final long serverTimestamp, boolean includeAllIndexes, final boolean sendAll) { + final long mutationTimestamp, final long serverTimestamp, boolean includeAllIndexes, final boolean sendAll) { final PTable table = tableRef.getTable(); final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism - includeAllIndexes ? - IndexMaintainer.maintainedIndexes(table.getIndexes().iterator()) : - table.isImmutableRows() ? - IndexMaintainer.maintainedGlobalIndexes(table.getIndexes().iterator()) : - Collections.<PTable>emptyIterator(); + (includeAllIndexes || table.isTransactional()) ? + IndexMaintainer.maintainedIndexes(table.getIndexes().iterator()) : + (table.isImmutableRows()) ? + IndexMaintainer.maintainedGlobalIndexes(table.getIndexes().iterator()) : + Collections.<PTable>emptyIterator(); final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size()); final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null; generateMutations(tableRef, mutationTimestamp, serverTimestamp, values, mutationList, mutationsPertainingToIndex); @@ -519,9 +523,14 @@ public class MutationState implements SQLCloseable { PTable index = indexes.next(); List<Mutation> indexMutations; try { - indexMutations = - IndexUtil.generateIndexData(table, index, values, mutationsPertainingToIndex, + if ((table.isImmutableRows() && (index.getIndexType() != IndexType.LOCAL)) || !table.isTransactional()) { + indexMutations = + IndexUtil.generateIndexData(table, index, values, mutationsPertainingToIndex, connection.getKeyValueBuilder(), connection); + } else { + indexMutations = phoenixTxnIndexMutationGenerator.getIndexUpdates(table, index, mutationsPertainingToIndex); + } + // we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map if (!sendAll) { TableRef key = new TableRef(index); @@ -532,7 +541,7 @@ public class MutationState implements SQLCloseable { indexMutations.addAll(deleteMutations); } } - } catch (SQLException e) { + } catch (SQLException | IOException e) { throw new IllegalDataException(e); } return new Pair<PName,List<Mutation>>(index.getPhysicalName(),indexMutations); @@ -587,7 +596,7 @@ public class MutationState implements SQLCloseable { // Row deletes for index tables are processed by running a re-written query // against the index table (as this allows for flexibility in being able to // delete rows). - rowMutationsPertainingToIndex = Collections.emptyList(); + rowMutationsPertainingToIndex = rowMutations; } else { for (Map.Entry<PColumn, byte[]> valueEntry : rowEntry.getValue().getColumnValues() .entrySet()) { @@ -1204,6 +1213,7 @@ public class MutationState implements SQLCloseable { mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); if (attribValue != null) { mutation.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue); + mutation.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION)); if (txState.length > 0) { mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxnIndexMutationGenerator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxnIndexMutationGenerator.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxnIndexMutationGenerator.java new file mode 100644 index 0000000..b596b75 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxnIndexMutationGenerator.java @@ -0,0 +1,519 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.filter.SkipScanFilter; +import org.apache.phoenix.hbase.index.MultiMutation; +import org.apache.phoenix.hbase.index.ValueGetter; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; +import org.apache.phoenix.hbase.index.covered.IndexUpdate; +import org.apache.phoenix.hbase.index.covered.TableState; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; +import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.index.PhoenixIndexCodec; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.types.PVarbinary; +import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel; +import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.SchemaUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.primitives.Longs; + + +public class PhoenixTxnIndexMutationGenerator { + + private static final Log LOG = LogFactory.getLog(PhoenixTxnIndexMutationGenerator.class); + + private final PhoenixConnection connection; + private final PhoenixTransactionContext phoenixTransactionContext; + + PhoenixTxnIndexMutationGenerator(PhoenixConnection connection, PhoenixTransactionContext phoenixTransactionContext) { + this.phoenixTransactionContext = phoenixTransactionContext; + this.connection = connection; + } + + private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) { + MultiMutation stored = mutations.get(row); + // we haven't seen this row before, so add it + if (stored == null) { + stored = new MultiMutation(row); + mutations.put(row, stored); + } + stored.addAll(m); + } + + public List<Mutation> getIndexUpdates(final PTable table, PTable index, List<Mutation> dataMutations) throws IOException, SQLException { + + if (dataMutations.isEmpty()) { + return new ArrayList<Mutation>(); + } + + Map<String,byte[]> updateAttributes = dataMutations.get(0).getAttributesMap(); + boolean replyWrite = (BaseScannerRegionObserver.ReplayWrite.fromBytes(updateAttributes.get(BaseScannerRegionObserver.REPLAY_WRITES)) != null); + byte[] txRollbackAttribute = updateAttributes.get(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY); + + IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); + + boolean isRollback = txRollbackAttribute!=null; + boolean isImmutable = index.isImmutableRows(); + ResultScanner currentScanner = null; + HTableInterface txTable = null; + // Collect up all mutations in batch + Map<ImmutableBytesPtr, MultiMutation> mutations = + new HashMap<ImmutableBytesPtr, MultiMutation>(); + Map<ImmutableBytesPtr, MultiMutation> findPriorValueMutations; + if (isImmutable && !isRollback) { + findPriorValueMutations = new HashMap<ImmutableBytesPtr, MultiMutation>(); + } else { + findPriorValueMutations = mutations; + } + // Collect the set of mutable ColumnReferences so that we can first + // run a scan to get the current state. We'll need this to delete + // the existing index rows. + int estimatedSize = 10; + Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(estimatedSize); + // For transactional tables, we use an index maintainer + // to aid in rollback if there's a KeyValue column in the index. The alternative would be + // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the + // client side. + Set<ColumnReference> allColumns = maintainer.getAllColumns(); + mutableColumns.addAll(allColumns); + + for(final Mutation m : dataMutations) { + // add the mutation to the batch set + ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow()); + // if we have no non PK columns, no need to find the prior values + + boolean requiresPriorRowState = !isImmutable || (maintainer.isRowDeleted(m) && !maintainer.getIndexedColumns().isEmpty()); + if (mutations != findPriorValueMutations && requiresPriorRowState) { + addMutation(findPriorValueMutations, row, m); + } + addMutation(mutations, row, m); + } + + List<Mutation> indexUpdates = new ArrayList<Mutation>(mutations.size() * 2); + try { + // Track if we have row keys with Delete mutations (or Puts that are + // Tephra's Delete marker). If there are none, we don't need to do the scan for + // prior versions, if there are, we do. Since rollbacks always have delete mutations, + // this logic will work there too. + if (!findPriorValueMutations.isEmpty()) { + List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size()); + for (ImmutableBytesPtr ptr : findPriorValueMutations.keySet()) { + keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary())); + } + Scan scan = new Scan(); + // Project all mutable columns + for (ColumnReference ref : mutableColumns) { + scan.addColumn(ref.getFamily(), ref.getQualifier()); + } + /* + * Indexes inherit the storage scheme of the data table which means all the indexes have the same + * storage scheme and empty key value qualifier. Note that this assumption would be broken if we start + * supporting new indexes over existing data tables to have a different storage scheme than the data + * table. + */ + byte[] emptyKeyValueQualifier = maintainer.getEmptyKeyValueQualifier(); + + // Project empty key value column + scan.addColumn(maintainer.getDataEmptyKeyValueCF(), emptyKeyValueQualifier); + ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1); + scanRanges.initializeScan(scan); + txTable = connection.getQueryServices().getTable(table.getPhysicalName().getBytes()); + // For rollback, we need to see all versions, including + // the last committed version as there may be multiple + // checkpointed versions. + SkipScanFilter filter = scanRanges.getSkipScanFilter(); + if (isRollback) { + filter = new SkipScanFilter(filter,true); + phoenixTransactionContext.setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT_ALL); + } + scan.setFilter(filter); + currentScanner = txTable.getScanner(scan); + } + if (isRollback) { + processRollback(maintainer, txRollbackAttribute, currentScanner, mutableColumns, indexUpdates, mutations, replyWrite, table); + } else { + processMutation(maintainer, txRollbackAttribute, currentScanner, mutableColumns, indexUpdates, mutations, findPriorValueMutations, replyWrite, table); + } + } finally { + if (txTable != null) txTable.close(); + } + + return indexUpdates; + } + + private void processMutation(IndexMaintainer maintainer, + byte[] txRollbackAttribute, + ResultScanner scanner, + Set<ColumnReference> upsertColumns, + Collection<Mutation> indexUpdates, + Map<ImmutableBytesPtr, MultiMutation> mutations, + Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue, + boolean replyWrite, + final PTable table) throws IOException, SQLException { + if (scanner != null) { + Result result; + ColumnReference emptyColRef = new ColumnReference(maintainer + .getDataEmptyKeyValueCF(), maintainer.getEmptyKeyValueQualifier()); + // Process existing data table rows by removing the old index row and adding the new index row + while ((result = scanner.next()) != null) { + Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow())); + TxTableState state = new TxTableState(upsertColumns, phoenixTransactionContext.getWritePointer(), m, emptyColRef, result); + generateDeletes(indexUpdates, txRollbackAttribute, state, maintainer, replyWrite, table); + generatePuts(indexUpdates, state, maintainer, replyWrite, table); + } + } + // Process new data table by adding new index rows + for (Mutation m : mutations.values()) { + TxTableState state = new TxTableState(upsertColumns, phoenixTransactionContext.getWritePointer(), m); + generatePuts(indexUpdates, state, maintainer, replyWrite, table); + generateDeletes(indexUpdates, txRollbackAttribute, state, maintainer, replyWrite, table); + } + } + + private void processRollback(IndexMaintainer maintainer, + byte[] txRollbackAttribute, + ResultScanner scanner, + Set<ColumnReference> mutableColumns, + Collection<Mutation> indexUpdates, + Map<ImmutableBytesPtr, MultiMutation> mutations, + boolean replyWrite, + final PTable table) throws IOException, SQLException { + if (scanner != null) { + Result result; + // Loop through last committed row state plus all new rows associated with current transaction + // to generate point delete markers for all index rows that were added. We don't have Tephra + // manage index rows in change sets because we don't want to be hit with the additional + // memory hit and do not need to do conflict detection on index rows. + ColumnReference emptyColRef = new ColumnReference(maintainer.getDataEmptyKeyValueCF(), maintainer.getEmptyKeyValueQualifier()); + while ((result = scanner.next()) != null) { + Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow())); + // Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest + // (as if we're "replaying" them in time order). + List<Cell> cells = result.listCells(); + Collections.sort(cells, new Comparator<Cell>() { + + @Override + public int compare(Cell o1, Cell o2) { + int c = Longs.compare(o1.getTimestamp(), o2.getTimestamp()); + if (c != 0) return c; + c = o1.getTypeByte() - o2.getTypeByte(); + if (c != 0) return c; + c = Bytes.compareTo(o1.getFamilyArray(), o1.getFamilyOffset(), o1.getFamilyLength(), o1.getFamilyArray(), o1.getFamilyOffset(), o1.getFamilyLength()); + if (c != 0) return c; + return Bytes.compareTo(o1.getQualifierArray(), o1.getQualifierOffset(), o1.getQualifierLength(), o1.getQualifierArray(), o1.getQualifierOffset(), o1.getQualifierLength()); + } + + }); + int i = 0; + int nCells = cells.size(); + Result oldResult = null, newResult; + long readPtr = phoenixTransactionContext.getReadPointer(); + do { + boolean hasPuts = false; + LinkedList<Cell> singleTimeCells = Lists.newLinkedList(); + long writePtr; + Cell cell = cells.get(i); + do { + hasPuts |= cell.getTypeByte() == KeyValue.Type.Put.getCode(); + writePtr = cell.getTimestamp(); + ListIterator<Cell> it = singleTimeCells.listIterator(); + do { + // Add at the beginning of the list to match the expected HBase + // newest to oldest sort order (which TxTableState relies on + // with the Result.getLatestColumnValue() calls). However, we + // still want to add Cells in the expected order for each time + // bound as otherwise we won't find it in our old state. + it.add(cell); + } while (++i < nCells && (cell = cells.get(i)).getTimestamp() == writePtr); + } while (i < nCells && cell.getTimestamp() <= readPtr); + + // Generate point delete markers for the prior row deletion of the old index value. + // The write timestamp is the next timestamp, not the current timestamp, + // as the earliest cells are the current values for the row (and we don't + // want to delete the current row). + if (oldResult != null) { + TxTableState state = new TxTableState(mutableColumns, writePtr, m, emptyColRef, oldResult); + generateDeletes(indexUpdates, txRollbackAttribute, state, maintainer, replyWrite, table); + } + // Generate point delete markers for the new index value. + // If our time batch doesn't have Puts (i.e. we have only Deletes), then do not + // generate deletes. We would have generated the delete above based on the state + // of the previous row. The delete markers do not give us the state we need to + // delete. + if (hasPuts) { + newResult = Result.create(singleTimeCells); + // First row may represent the current state which we don't want to delete + if (writePtr > readPtr) { + TxTableState state = new TxTableState(mutableColumns, writePtr, m, emptyColRef, newResult); + generateDeletes(indexUpdates, txRollbackAttribute, state, maintainer, replyWrite, table); + } + oldResult = newResult; + } else { + oldResult = null; + } + } while (i < nCells); + } + } + } + + private Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMaintainer maintainer, boolean replyWrite, final PTable table) throws IOException, SQLException { + if (maintainer.isRowDeleted(state.getPendingUpdate())) { + return Collections.emptyList(); + } + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + ptr.set(state.getCurrentRowKey()); + List<IndexUpdate> indexUpdates = Lists.newArrayList(); + Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), replyWrite, false, null); + ValueGetter valueGetter = statePair.getFirst(); + IndexUpdate indexUpdate = statePair.getSecond(); + indexUpdate.setTable(maintainer.isLocalIndex() ? table.getName().getBytes() : maintainer.getIndexTableName()); + + byte[] regionStartKey = null; + byte[] regionEndkey = null; + if(maintainer.isLocalIndex()) { + HRegionLocation tableRegionLocation = connection.getQueryServices().getTableRegionLocation(table.getPhysicalName().getBytes(), state.getCurrentRowKey()); + regionStartKey = tableRegionLocation.getRegionInfo().getStartKey(); + regionEndkey = tableRegionLocation.getRegionInfo().getEndKey(); + } + + Put put = maintainer.buildUpdateMutation(PhoenixIndexCodec.KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), regionStartKey, regionEndkey); + indexUpdate.setUpdate(put); + indexUpdates.add(indexUpdate); + + return indexUpdates; + } + + private Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMaintainer maintainer, boolean replyWrite, final PTable table) throws IOException, SQLException { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + ptr.set(state.getCurrentRowKey()); + List<IndexUpdate> indexUpdates = Lists.newArrayList(); + // For transactional tables, we use an index maintainer + // to aid in rollback if there's a KeyValue column in the index. The alternative would be + // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the + // client side. + Set<ColumnReference> cols = Sets.newHashSet(maintainer.getAllColumns()); + cols.add(new ColumnReference(maintainer.getDataEmptyKeyValueCF(), maintainer.getEmptyKeyValueQualifier())); + Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(cols, replyWrite, true, null); + ValueGetter valueGetter = statePair.getFirst(); + if (valueGetter!=null) { + IndexUpdate indexUpdate = statePair.getSecond(); + indexUpdate.setTable(maintainer.isLocalIndex() ? table.getName().getBytes() : maintainer.getIndexTableName()); + + byte[] regionStartKey = null; + byte[] regionEndkey = null; + if(maintainer.isLocalIndex()) { + HRegionLocation tableRegionLocation = connection.getQueryServices().getTableRegionLocation(table.getPhysicalName().getBytes(), state.getCurrentRowKey()); + regionStartKey = tableRegionLocation.getRegionInfo().getStartKey(); + regionEndkey = tableRegionLocation.getRegionInfo().getEndKey(); + } + + Delete delete = maintainer.buildDeleteMutation(PhoenixIndexCodec.KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(), + state.getCurrentTimestamp(), regionStartKey, regionEndkey); + indexUpdate.setUpdate(delete); + indexUpdates.add(indexUpdate); + } + return indexUpdates; + } + + private void generateDeletes(Collection<Mutation> indexUpdates, + byte[] attribValue, + TxTableState state, + IndexMaintainer maintainer, + boolean replyWrite, + final PTable table) throws IOException, SQLException { + Iterable<IndexUpdate> deletes = getIndexDeletes(state, maintainer, replyWrite, table); + for (IndexUpdate delete : deletes) { + if (delete.isValid()) { + delete.getUpdate().setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue); + indexUpdates.add(delete.getUpdate()); + } + } + } + + private boolean generatePuts(Collection<Mutation> indexUpdates, + TxTableState state, + IndexMaintainer maintainer, + boolean replyWrite, + final PTable table) throws IOException, SQLException { + state.applyMutation(); + Iterable<IndexUpdate> puts = getIndexUpserts(state, maintainer, replyWrite, table); + boolean validPut = false; + for (IndexUpdate put : puts) { + if (put.isValid()) { + indexUpdates.add(put.getUpdate()); + validPut = true; + } + } + return validPut; + } + + + private static class TxTableState implements TableState { + private final Mutation mutation; + private final long currentTimestamp; + private final List<KeyValue> pendingUpdates; + private final Set<ColumnReference> indexedColumns; + private final Map<ColumnReference, ImmutableBytesWritable> valueMap; + + private TxTableState(Set<ColumnReference> indexedColumns, long currentTimestamp, Mutation mutation) { + this.currentTimestamp = currentTimestamp; + this.indexedColumns = indexedColumns; + this.mutation = mutation; + int estimatedSize = indexedColumns.size(); + this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize); + this.pendingUpdates = Lists.newArrayListWithExpectedSize(estimatedSize); + try { + CellScanner scanner = mutation.cellScanner(); + while (scanner.advance()) { + Cell cell = scanner.current(); + pendingUpdates.add(KeyValueUtil.ensureKeyValue(cell)); + } + } catch (IOException e) { + throw new RuntimeException(e); // Impossible + } + } + + public TxTableState(Set<ColumnReference> indexedColumns, long currentTimestamp, Mutation m, ColumnReference emptyColRef, Result r) { + this(indexedColumns, currentTimestamp, m); + + for (ColumnReference ref : indexedColumns) { + Cell cell = r.getColumnLatestCell(ref.getFamily(), ref.getQualifier()); + if (cell != null) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + valueMap.put(ref, ptr); + } + } + } + + @Override + public RegionCoprocessorEnvironment getEnvironment() { + return null; + } + + @Override + public long getCurrentTimestamp() { + return currentTimestamp; + } + + @Override + public byte[] getCurrentRowKey() { + return mutation.getRow(); + } + + @Override + public List<? extends IndexedColumnGroup> getIndexColumnHints() { + return Collections.emptyList(); + } + + private void applyMutation() { + for (Cell cell : pendingUpdates) { + if (cell.getTypeByte() == KeyValue.Type.Delete.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) { + ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + valueMap.remove(ref); + } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) { + for (ColumnReference ref : indexedColumns) { + if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) { + valueMap.remove(ref); + } + } + } else if (cell.getTypeByte() == KeyValue.Type.Put.getCode()){ + ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + if (indexedColumns.contains(ref)) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + valueMap.put(ref, ptr); + } + } else { + throw new IllegalStateException("Unexpected mutation type for " + cell); + } + } + } + + @Override + public Collection<KeyValue> getPendingUpdate() { + return pendingUpdates; + } + + @Override + public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean returnNullScannerIfRowNotFound, IndexMetaData indexMetaData) + throws IOException { + // TODO: creating these objects over and over again is wasteful + ColumnTracker tracker = new ColumnTracker(indexedColumns); + ValueGetter getter = new ValueGetter() { + + @Override + public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException { + return valueMap.get(ref); + } + + @Override + public byte[] getRowKey() { + return mutation.getRow(); + } + + }; + Pair<ValueGetter, IndexUpdate> pair = new Pair<ValueGetter, IndexUpdate>(getter, new IndexUpdate(tracker)); + return pair; + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java index f7784e5..9bd4db8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java @@ -15,7 +15,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import org.apache.hadoop.hbase.Cell; @@ -218,11 +217,6 @@ public class LocalTableState implements TableState { } @Override - public Map<String, byte[]> getUpdateAttributes() { - return this.update.getAttributesMap(); - } - - @Override public byte[] getCurrentRowKey() { return this.update.getRow(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java index f85de59..605cbe3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java @@ -21,10 +21,8 @@ package org.apache.phoenix.hbase.index.covered; import java.io.IOException; import java.util.Collection; import java.util.List; -import java.util.Map; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.hbase.index.ValueGetter; @@ -51,11 +49,6 @@ public interface TableState { public long getCurrentTimestamp(); /** - * @return the attributes attached to the current update (e.g. {@link Mutation}). - */ - public Map<String, byte[]> getUpdateAttributes(); - - /** * Get a getter interface for the state of the index row * @param indexedColumns list of indexed columns. * @param ignoreNewerMutations ignore mutations newer than m when determining current state. Useful http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java index 18b9edd..03db767 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java @@ -45,7 +45,7 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory { } @Override - public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] txState, final MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws SQLException { + public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] txState, final MemoryChunk chunk, boolean useProtoForIndexMaintainer, final int clientVersion) throws SQLException { // just use the standard keyvalue builder - this doesn't really need to be fast final List<IndexMaintainer> maintainers = @@ -72,6 +72,11 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory { public PhoenixTransactionContext getTransactionContext() { return txnContext; } + + @Override + public int getClientVersion() { + return clientVersion; + } }; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index ffb199a..ebad7da 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -44,7 +44,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec { public static final String INDEX_PROTO_MD = "IdxProtoMD"; public static final String INDEX_UUID = "IdxUUID"; public static final String INDEX_MAINTAINERS = "IndexMaintainers"; - private static KeyValueBuilder KV_BUILDER = GenericKeyValueBuilder.INSTANCE; + public static final String CLIENT_VERSION = "_ClientVersion"; + public static KeyValueBuilder KV_BUILDER = GenericKeyValueBuilder.INSTANCE; private RegionCoprocessorEnvironment env; http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java index 05371a6..cc254d3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.IndexMetaDataCache; import org.apache.phoenix.cache.ServerCacheClient; @@ -60,6 +61,8 @@ public class PhoenixIndexMetaData implements IndexMetaData { if (md != null) { final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md, useProto); final PhoenixTransactionContext txnContext = TransactionFactory.getTransactionFactory().getTransactionContext(txState); + byte[] clientVersionBytes = attributes.get(PhoenixIndexCodec.CLIENT_VERSION); + final int clientVersion = clientVersionBytes == null ? IndexMetaDataCache.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes); return new IndexMetaDataCache() { @Override @@ -75,6 +78,11 @@ public class PhoenixIndexMetaData implements IndexMetaData { return txnContext; } + @Override + public int getClientVersion() { + return clientVersion; + } + }; } else { byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB); @@ -127,6 +135,10 @@ public class PhoenixIndexMetaData implements IndexMetaData { return attributes; } + public int getClientVersion() { + return indexMetaDataCache.getClientVersion(); + } + @Override public ReplayWrite getReplayWrite() { return replayWrite; http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 3495267..eaddf62 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -77,6 +77,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.hbase.index.write.LeaveIndexActiveFailurePolicy; import org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.trace.TracingUtils; @@ -193,6 +194,10 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { Map<String,byte[]> updateAttributes = m.getAttributesMap(); PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes); + if (indexMetaData.getClientVersion() >= PhoenixDatabaseMetaData.MIN_TX_CLIENT_SIDE_MAINTENANCE) { + super.preBatchMutate(c, miniBatchOp); + return; + } byte[] txRollbackAttribute = m.getAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY); Collection<Pair<Mutation, byte[]>> indexUpdates = null; // get the current span, or just use a null-span to avoid a bunch of if statements @@ -557,10 +562,6 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { return currentTimestamp; } - @Override - public Map<String, byte[]> getUpdateAttributes() { - return attributes; - } @Override public byte[] getCurrentRowKey() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index 094f743..b88b381 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -323,6 +323,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { public static final int MIN_NAMESPACE_MAPPED_PHOENIX_VERSION = VersionUtil.encodeVersion("4", "8", "0"); public static final int MIN_PENDING_ACTIVE_INDEX = VersionUtil.encodeVersion("4", "12", "0"); public static final int MIN_PENDING_DISABLE_INDEX = VersionUtil.encodeVersion("4", "14", "0"); + public static final int MIN_TX_CLIENT_SIDE_MAINTENANCE = VersionUtil.encodeVersion("4", "14", "0"); // Version below which we should turn off essential column family. public static final int ESSENTIAL_FAMILY_VERSION_THRESHOLD = VersionUtil.encodeVersion("0", "94", "7"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/47dc9359/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java index a8ddd62..4fc3c70 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java @@ -17,16 +17,25 @@ */ package org.apache.phoenix.join; -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; import java.sql.SQLException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import net.jcip.annotations.Immutable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; - import org.apache.phoenix.cache.HashCache; import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory; import org.apache.phoenix.exception.SQLExceptionCode; @@ -37,8 +46,10 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; -import org.apache.phoenix.util.*; - +import org.apache.phoenix.util.ResultUtil; +import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.SizedUtil; +import org.apache.phoenix.util.TupleUtil; import org.iq80.snappy.CorruptionException; import org.iq80.snappy.Snappy; @@ -56,14 +67,14 @@ public class HashCacheFactory implements ServerCacheFactory { } @Override - public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk, boolean useProtoForIndexMaintainer) throws SQLException { + public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk, boolean useProtoForIndexMaintainer, int clientVersion) throws SQLException { try { // This reads the uncompressed length from the front of the compressed input int uncompressedLen = Snappy.getUncompressedLength(cachePtr.get(), cachePtr.getOffset()); byte[] uncompressed = new byte[uncompressedLen]; Snappy.uncompress(cachePtr.get(), cachePtr.getOffset(), cachePtr.getLength(), uncompressed, 0); - return new HashCacheImpl(uncompressed, chunk); + return new HashCacheImpl(uncompressed, chunk, clientVersion); } catch (CorruptionException e) { throw ServerUtil.parseServerException(e); } @@ -74,10 +85,12 @@ public class HashCacheFactory implements ServerCacheFactory { private final Map<ImmutableBytesPtr,List<Tuple>> hashCache; private final MemoryChunk memoryChunk; private final boolean singleValueOnly; + private final int clientVersion; - private HashCacheImpl(byte[] hashCacheBytes, MemoryChunk memoryChunk) { + private HashCacheImpl(byte[] hashCacheBytes, MemoryChunk memoryChunk, int clientVersion) { try { this.memoryChunk = memoryChunk; + this.clientVersion = clientVersion; byte[] hashCacheByteArray = hashCacheBytes; int offset = 0; ByteArrayInputStream input = new ByteArrayInputStream(hashCacheByteArray, offset, hashCacheBytes.length); @@ -140,6 +153,11 @@ public class HashCacheFactory implements ServerCacheFactory { return ret; } + + @Override + public int getClientVersion() { + return clientVersion; + } } }