Repository: phoenix Updated Branches: refs/heads/5.x-HBase-2.0 9ff02ea24 -> 1123f96cb
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1123f96c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java index 60f07b7..1ad1ce5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java @@ -33,9 +33,11 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -105,6 +107,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr try { byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(maintainers); byte[] uuidValue = ServerCacheClient.generateId(); + byte[] clientVersion = Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION); Put put = null; Delete del = null; for (Cell cell : value.rawCells()) { @@ -113,6 +116,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr put = new Put(CellUtil.cloneRow(cell)); put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); put.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue); + put.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersion); put.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES); mutations.add(put); } @@ -122,6 +126,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr del = new Delete(CellUtil.cloneRow(cell)); del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); del.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue); + del.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersion); del.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES); mutations.add(del); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1123f96c/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 5c5a42e..e17471d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -181,7 +181,6 @@ import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.hbase.index.util.VersionUtil; import org.apache.phoenix.index.PhoenixIndexBuilder; import org.apache.phoenix.index.PhoenixIndexCodec; -import org.apache.phoenix.index.PhoenixTransactionalIndexer; import org.apache.phoenix.iterate.TableResultIterator; import org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -858,19 +857,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement && !SchemaUtil.isMetaTable(tableName) && !SchemaUtil.isStatsTable(tableName)) { if (isTransactional) { - if(!newDesc.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) { - builder.addCoprocessor(PhoenixTransactionalIndexer.class.getName(), null, priority, null); - } // For alter table, remove non transactional index coprocessor if(newDesc.hasCoprocessor(Indexer.class.getName())) { builder.removeCoprocessor(Indexer.class.getName()); } } else { if (!newDesc.hasCoprocessor(Indexer.class.getName())) { - // If exception on alter table to transition back to non transactional - if (newDesc.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) { - builder.removeCoprocessor(PhoenixTransactionalIndexer.class.getName()); - } Map<String, String> opts = Maps.newHashMapWithExpectedSize(1); opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName()); Indexer.enableIndexing(builder, PhoenixIndexBuilder.class, opts, priority); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1123f96c/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index 0b48376..633595a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -1037,27 +1037,15 @@ public class PTableImpl implements PTable { @Override public void delete() { newMutations(); - // we're using the Tephra column family delete marker here to prevent the translation - // of deletes to puts by the Tephra's TransactionProcessor - if (PTableImpl.this.isTransactional()) { - Put put = new Put(key); - if (families.isEmpty()) { - put.addColumn(SchemaUtil.getEmptyColumnFamily(PTableImpl.this), TransactionFactory.getTransactionFactory().getTransactionContext().getFamilyDeleteMarker(), ts, - HConstants.EMPTY_BYTE_ARRAY); - } else { - for (PColumnFamily colFamily : families) { - put.addColumn(colFamily.getName().getBytes(), TransactionFactory.getTransactionFactory().getTransactionContext().getFamilyDeleteMarker(), ts, - HConstants.EMPTY_BYTE_ARRAY); - } - } - deleteRow = put; + Delete delete = new Delete(key); + if (families.isEmpty()) { + delete.addFamily(SchemaUtil.getEmptyColumnFamily(PTableImpl.this), ts); } else { - Delete delete = new Delete(key); for (PColumnFamily colFamily : families) { - delete.addFamily(colFamily.getName().getBytes(), ts); + delete.addFamily(colFamily.getName().getBytes(), ts); } - deleteRow = delete; } + deleteRow = delete; if (isWALDisabled()) { deleteRow.setDurability(Durability.SKIP_WAL); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1123f96c/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java index f4c83b2..c741f4e 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java @@ -28,6 +28,7 @@ import java.sql.SQLException; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.memory.GlobalMemoryManager; @@ -47,7 +48,7 @@ public class TenantCacheTest { TenantCacheImpl newTenantCache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive); ImmutableBytesPtr cacheId = new ImmutableBytesPtr(Bytes.toBytes("a")); ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a")); - newTenantCache.addServerCache(cacheId, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true); + newTenantCache.addServerCache(cacheId, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, MetaDataProtocol.PHOENIX_VERSION); assertEquals(maxBytes-1, memoryManager.getAvailableMemory()); newTenantCache.removeServerCache(cacheId); assertEquals(maxBytes, memoryManager.getAvailableMemory()); @@ -62,7 +63,7 @@ public class TenantCacheTest { TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, ticker); ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a")); ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a")); - cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true); + cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, MetaDataProtocol.PHOENIX_VERSION); assertEquals(maxBytes-1, memoryManager.getAvailableMemory()); ticker.time += (maxServerCacheTimeToLive + 1) * 1000000; cache.cleanUp(); @@ -79,7 +80,7 @@ public class TenantCacheTest { TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, ticker); ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a")); ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a")); - cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true); + cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, MetaDataProtocol.PHOENIX_VERSION); assertEquals(maxBytes-1, memoryManager.getAvailableMemory()); ticker.time += (maxServerCacheTimeToLive + 1) * 1000000; assertNull(cache.getServerCache(cacheId1)); @@ -94,12 +95,11 @@ public class TenantCacheTest { ManualTicker ticker = new ManualTicker(); TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive, ticker); ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a")); - ImmutableBytesPtr cacheId2 = new ImmutableBytesPtr(Bytes.toBytes("b")); ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("12345678")); - cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true); + cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, MetaDataProtocol.PHOENIX_VERSION); assertEquals(2, memoryManager.getAvailableMemory()); ticker.time += (maxServerCacheTimeToLive + 1) * 1000000; - cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true); + cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory, true, MetaDataProtocol.PHOENIX_VERSION); assertEquals(2, memoryManager.getAvailableMemory()); } @@ -124,7 +124,7 @@ public class TenantCacheTest { } @Override - public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk, boolean useProtoForIndexMaintainer) + public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk, boolean useProtoForIndexMaintainer, int clientVersion) throws SQLException { return chunk; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1123f96c/phoenix-protocol/src/main/ServerCachingService.proto ---------------------------------------------------------------------- diff --git a/phoenix-protocol/src/main/ServerCachingService.proto b/phoenix-protocol/src/main/ServerCachingService.proto index 044c111..c059a1a 100644 --- a/phoenix-protocol/src/main/ServerCachingService.proto +++ b/phoenix-protocol/src/main/ServerCachingService.proto @@ -71,6 +71,7 @@ message AddServerCacheRequest { required ServerCacheFactory cacheFactory = 4; optional bytes txState = 5; optional bool hasProtoBufIndexMaintainer = 6; + optional int32 clientVersion = 7; } message AddServerCacheResponse {