Repository: phoenix Updated Branches: refs/heads/txn 0d44dd806 -> 826ebf5ce
Push transaction state to server for secondary indexing Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/826ebf5c Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/826ebf5c Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/826ebf5c Branch: refs/heads/txn Commit: 826ebf5ce741342ed1c758e594ca01bc9cb8e036 Parents: 0d44dd8 Author: James Taylor <[email protected]> Authored: Wed Mar 25 11:54:44 2015 -0700 Committer: James Taylor <[email protected]> Committed: Wed Mar 25 11:55:11 2015 -0700 ---------------------------------------------------------------------- .../phoenix/cache/IndexMetaDataCache.java | 22 ++++ .../apache/phoenix/cache/ServerCacheClient.java | 3 +- .../org/apache/phoenix/cache/TenantCache.java | 2 +- .../apache/phoenix/cache/TenantCacheImpl.java | 13 +- .../apache/phoenix/compile/DeleteCompiler.java | 8 +- .../apache/phoenix/compile/UpsertCompiler.java | 8 +- .../coprocessor/BaseScannerRegionObserver.java | 14 ++- .../phoenix/coprocessor/ScanRegionObserver.java | 11 +- .../coprocessor/ServerCachingEndpointImpl.java | 8 +- .../coprocessor/ServerCachingProtocol.java | 5 +- .../generated/ServerCachingProtos.java | 120 +++++++++++++++++-- .../apache/phoenix/execute/BaseQueryPlan.java | 17 ++- .../apache/phoenix/execute/MutationState.java | 5 +- .../hbase/index/builder/IndexBuilder.java | 9 ++ .../hbase/index/scanner/ScannerBuilder.java | 5 +- .../phoenix/index/IndexMetaDataCacheClient.java | 10 +- .../index/IndexMetaDataCacheFactory.java | 18 ++- .../phoenix/index/PhoenixIndexBuilder.java | 2 +- .../apache/phoenix/index/PhoenixIndexCodec.java | 53 ++++++-- .../apache/phoenix/jdbc/PhoenixConnection.java | 4 + .../apache/phoenix/join/HashCacheClient.java | 3 +- .../apache/phoenix/join/HashCacheFactory.java | 2 +- .../apache/phoenix/util/TransactionUtil.java | 8 +- .../src/main/ServerCachingService.proto | 1 + 24 files changed, 278 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java index 48949f1..359e08f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java @@ -19,10 +19,32 @@ package org.apache.phoenix.cache; import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; import java.util.List; +import co.cask.tephra.Transaction; + import org.apache.phoenix.index.IndexMaintainer; public interface IndexMetaDataCache extends Closeable { + public static final IndexMetaDataCache EMPTY_INDEX_META_DATA_CACHE = new IndexMetaDataCache() { + + @Override + public void close() throws IOException { + } + + @Override + public List<IndexMaintainer> getIndexMaintainers() { + return Collections.emptyList(); + } + + @Override + public Transaction getTransaction() { + return null; + } + + }; public List<IndexMaintainer> getIndexMaintainers(); + public Transaction getTransaction(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java index 1233e1c..76bedbc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java @@ -142,7 +142,7 @@ public class ServerCacheClient { } - public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws SQLException { + public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState, final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws SQLException { ConnectionQueryServices services = connection.getQueryServices(); MemoryChunk chunk = services.getMemoryManager().allocate(cachePtr.getLength()); List<Closeable> closeables = new ArrayList<Closeable>(); @@ -201,6 +201,7 @@ public class ServerCacheClient { ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory.newBuilder(); svrCacheFactoryBuider.setClassName(cacheFactory.getClass().getName()); builder.setCacheFactory(svrCacheFactoryBuider.build()); + builder.setTxState(HBaseZeroCopyByteString.wrap(txState)); instance.addServerCache(controller, builder.build(), rpcCallback); if(controller.getFailedOn() != null) { throw controller.getFailedOn(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java index b968a9b..c7cd58f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java @@ -37,6 +37,6 @@ import org.apache.phoenix.memory.MemoryManager; public interface TenantCache { MemoryManager getMemoryManager(); Closeable getServerCache(ImmutableBytesPtr cacheId); - Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException; + Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException; void removeServerCache(ImmutableBytesPtr cacheId) throws SQLException; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java index 9005fa8..6ef7a6f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java @@ -23,14 +23,17 @@ import java.util.Collections; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; - -import com.google.common.cache.*; import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.memory.MemoryManager; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.util.Closeables; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + /** * * Cache per tenant on server side. Tracks memory usage for each @@ -80,11 +83,11 @@ public class TenantCacheImpl implements TenantCache { } @Override - public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException { - MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength()); + public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException { + MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength() + txState.length); boolean success = false; try { - Closeable element = cacheFactory.newCache(cachePtr, chunk); + Closeable element = cacheFactory.newCache(cachePtr, txState, chunk); getServerCaches().put(cacheId, element); success = true; return element; http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 273024c..a0369d5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -75,8 +75,10 @@ import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.TransactionUtil; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -490,12 +492,14 @@ public class DeleteCompiler { public MutationState execute() throws SQLException { // TODO: share this block of code with UPSERT SELECT ImmutableBytesWritable ptr = context.getTempPtr(); - tableRef.getTable().getIndexMaintainers(ptr, context.getConnection()); + PTable table = tableRef.getTable(); + table.getIndexMaintainers(ptr, context.getConnection()); + byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getTransactionContext().getCurrentTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY; ServerCache cache = null; try { if (ptr.getLength() > 0) { IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); - cache = client.addIndexMetadataCache(context.getScanRanges(), ptr); + cache = client.addIndexMetadataCache(context.getScanRanges(), ptr, txState); byte[] uuidValue = cache.getId(); context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 5fd1602..e72b634 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -89,6 +89,7 @@ import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TransactionUtil; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -607,12 +608,15 @@ public class UpsertCompiler { @Override public MutationState execute() throws SQLException { ImmutableBytesWritable ptr = context.getTempPtr(); - tableRef.getTable().getIndexMaintainers(ptr, context.getConnection()); + PTable table = tableRef.getTable(); + table.getIndexMaintainers(ptr, context.getConnection()); + byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getTransactionContext().getCurrentTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY; + ServerCache cache = null; try { if (ptr.getLength() > 0) { IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); - cache = client.addIndexMetadataCache(context.getScanRanges(), ptr); + cache = client.addIndexMetadataCache(context.getScanRanges(), ptr, txState); byte[] uuidValue = cache.getId(); scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 25ac408..c4eb4f7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.List; import java.util.Set; +import co.cask.tephra.Transaction; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -85,6 +87,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey"; public static final String REVERSE_SCAN = "_ReverseScan"; public static final String ANALYZE_TABLE = "_ANALYZETABLE"; + public static final String TX_STATE = "_TxState"; /** * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations * are used to augment log lines emitted by Phoenix. See https://issues.apache.org/jira/browse/PHOENIX-1198. @@ -222,7 +225,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { final byte[][] viewConstants, final TupleProjector projector, final ImmutableBytesWritable ptr) { return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, tupleProjector, - dataRegion, indexMaintainer, viewConstants, null, null, projector, ptr); + dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr); } /** @@ -230,13 +233,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { * re-throws as DoNotRetryIOException to prevent needless retrying hanging the query * for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do * the same from a custom filter. - * @param arrayFuncRefs * @param arrayKVRefs + * @param arrayFuncRefs * @param offset starting position in the rowkey. * @param scan * @param tupleProjector * @param dataRegion * @param indexMaintainer + * @param tx TODO * @param viewConstants */ protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, @@ -244,9 +248,9 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { final Expression[] arrayFuncRefs, final int offset, final Scan scan, final ColumnReference[] dataColumns, final TupleProjector tupleProjector, final HRegion dataRegion, final IndexMaintainer indexMaintainer, - final byte[][] viewConstants, final KeyValueSchema kvSchema, - final ValueBitSet kvSchemaBitSet, final TupleProjector projector, - final ImmutableBytesWritable ptr) { + Transaction tx, final byte[][] viewConstants, + final KeyValueSchema kvSchema, final ValueBitSet kvSchemaBitSet, + final TupleProjector projector, final ImmutableBytesWritable ptr) { return new RegionScanner() { @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index ddde407..04275b2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -26,7 +26,7 @@ import java.sql.SQLException; import java.util.List; import java.util.Set; -import com.google.common.collect.Sets; +import co.cask.tephra.Transaction; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HRegionInfo; @@ -58,8 +58,10 @@ import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.TransactionUtil; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; /** @@ -190,6 +192,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { HRegion dataRegion = null; IndexMaintainer indexMaintainer = null; byte[][] viewConstants = null; + Transaction tx = null; ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan); if (dataColumns != null) { tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); @@ -198,14 +201,16 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes); indexMaintainer = indexMaintainers.get(0); viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); + byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE); + tx = TransactionUtil.decodeTxnState(txState); } final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); innerScanner = getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan, - dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants, - kvSchema, kvSchemaBitSet, j == null ? p : null, ptr); + dataColumns, tupleProjector, dataRegion, indexMaintainer, tx, + viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr); final ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan); if (j != null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java index 2f31b08..9f3bdb4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java @@ -26,18 +26,17 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorException; import org.apache.hadoop.hbase.coprocessor.CoprocessorService; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; - import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.TenantCache; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; - import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheRequest; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheResponse; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheRequest; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheResponse; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachingService; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.protobuf.ProtobufUtil; +import org.apache.phoenix.util.ByteUtil; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; @@ -66,6 +65,7 @@ public class ServerCachingEndpointImpl extends ServerCachingService implements C ImmutableBytesWritable cachePtr = org.apache.phoenix.protobuf.ProtobufUtil .toImmutableBytesWritable(request.getCachePtr()); + byte[] txState = request.hasTxState() ? request.getTxState().toByteArray() : ByteUtil.EMPTY_BYTE_ARRAY; try { @SuppressWarnings("unchecked") @@ -73,7 +73,7 @@ public class ServerCachingEndpointImpl extends ServerCachingService implements C (Class<ServerCacheFactory>) Class.forName(request.getCacheFactory().getClassName()); ServerCacheFactory cacheFactory = serverCacheFactoryClass.newInstance(); tenantCache.addServerCache(new ImmutableBytesPtr(request.getCacheId().toByteArray()), - cachePtr, cacheFactory); + cachePtr, txState, cacheFactory); } catch (Throwable e) { ProtobufUtil.setControllerException(controller, new IOException(e)); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java index 4fdfe99..b201c8e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java @@ -36,18 +36,19 @@ import org.apache.phoenix.memory.MemoryManager.MemoryChunk; */ public interface ServerCachingProtocol { public static interface ServerCacheFactory extends Writable { - public Closeable newCache(ImmutableBytesWritable cachePtr, MemoryChunk chunk) throws SQLException; + public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk) throws SQLException; } /** * Add the cache to the region server cache. * @param tenantId the tenantId or null if not applicable * @param cacheId unique identifier of the cache * @param cachePtr pointer to the byte array of the cache + * @param txState TODO * @param cacheFactory factory that converts from byte array to object representation on the server side * @return true on success and otherwise throws * @throws SQLException */ - public boolean addServerCache(byte[] tenantId, byte[] cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException; + public boolean addServerCache(byte[] tenantId, byte[] cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException; /** * Remove the cache from the region server cache. Called upon completion of * the operation when cache is no longer needed. http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java index 69db21b..5ee1dfb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java @@ -699,6 +699,16 @@ public final class ServerCachingProtos { * <code>required .ServerCacheFactory cacheFactory = 4;</code> */ org.apache.phoenix.coprocessor.generated.ServerCacheFactoryProtos.ServerCacheFactoryOrBuilder getCacheFactoryOrBuilder(); + + // optional bytes txState = 5; + /** + * <code>optional bytes txState = 5;</code> + */ + boolean hasTxState(); + /** + * <code>optional bytes txState = 5;</code> + */ + com.google.protobuf.ByteString getTxState(); } /** * Protobuf type {@code AddServerCacheRequest} @@ -787,6 +797,11 @@ public final class ServerCachingProtos { bitField0_ |= 0x00000008; break; } + case 42: { + bitField0_ |= 0x00000010; + txState_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -903,11 +918,28 @@ public final class ServerCachingProtos { return cacheFactory_; } + // optional bytes txState = 5; + public static final int TXSTATE_FIELD_NUMBER = 5; + private com.google.protobuf.ByteString txState_; + /** + * <code>optional bytes txState = 5;</code> + */ + public boolean hasTxState() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * <code>optional bytes txState = 5;</code> + */ + public com.google.protobuf.ByteString getTxState() { + return txState_; + } + private void initFields() { tenantId_ = com.google.protobuf.ByteString.EMPTY; cacheId_ = com.google.protobuf.ByteString.EMPTY; cachePtr_ = org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ImmutableBytesWritable.getDefaultInstance(); cacheFactory_ = org.apache.phoenix.coprocessor.generated.ServerCacheFactoryProtos.ServerCacheFactory.getDefaultInstance(); + txState_ = com.google.protobuf.ByteString.EMPTY; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -953,6 +985,9 @@ public final class ServerCachingProtos { if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeMessage(4, cacheFactory_); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBytes(5, txState_); + } getUnknownFields().writeTo(output); } @@ -978,6 +1013,10 @@ public final class ServerCachingProtos { size += com.google.protobuf.CodedOutputStream .computeMessageSize(4, cacheFactory_); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(5, txState_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1021,6 +1060,11 @@ public final class ServerCachingProtos { result = result && getCacheFactory() .equals(other.getCacheFactory()); } + result = result && (hasTxState() == other.hasTxState()); + if (hasTxState()) { + result = result && getTxState() + .equals(other.getTxState()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -1050,6 +1094,10 @@ public final class ServerCachingProtos { hash = (37 * hash) + CACHEFACTORY_FIELD_NUMBER; hash = (53 * hash) + getCacheFactory().hashCode(); } + if (hasTxState()) { + hash = (37 * hash) + TXSTATE_FIELD_NUMBER; + hash = (53 * hash) + getTxState().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -1177,6 +1225,8 @@ public final class ServerCachingProtos { cacheFactoryBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000008); + txState_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -1229,6 +1279,10 @@ public final class ServerCachingProtos { } else { result.cacheFactory_ = cacheFactoryBuilder_.build(); } + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000010; + } + result.txState_ = txState_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1257,6 +1311,9 @@ public final class ServerCachingProtos { if (other.hasCacheFactory()) { mergeCacheFactory(other.getCacheFactory()); } + if (other.hasTxState()) { + setTxState(other.getTxState()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1610,6 +1667,42 @@ public final class ServerCachingProtos { return cacheFactoryBuilder_; } + // optional bytes txState = 5; + private com.google.protobuf.ByteString txState_ = com.google.protobuf.ByteString.EMPTY; + /** + * <code>optional bytes txState = 5;</code> + */ + public boolean hasTxState() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * <code>optional bytes txState = 5;</code> + */ + public com.google.protobuf.ByteString getTxState() { + return txState_; + } + /** + * <code>optional bytes txState = 5;</code> + */ + public Builder setTxState(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000010; + txState_ = value; + onChanged(); + return this; + } + /** + * <code>optional bytes txState = 5;</code> + */ + public Builder clearTxState() { + bitField0_ = (bitField0_ & ~0x00000010); + txState_ = getDefaultInstance().getTxState(); + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:AddServerCacheRequest) } @@ -3383,20 +3476,21 @@ public final class ServerCachingProtos { "\n\032ServerCachingService.proto\032\030ServerCach" + "eFactory.proto\"K\n\026ImmutableBytesWritable" + "\022\021\n\tbyteArray\030\001 \002(\014\022\016\n\006offset\030\002 \002(\005\022\016\n\006l" + - "ength\030\003 \002(\005\"\220\001\n\025AddServerCacheRequest\022\020\n" + + "ength\030\003 \002(\005\"\241\001\n\025AddServerCacheRequest\022\020\n" + "\010tenantId\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014\022)\n\010cach" + "ePtr\030\003 \002(\0132\027.ImmutableBytesWritable\022)\n\014c" + - "acheFactory\030\004 \002(\0132\023.ServerCacheFactory\"(" + - "\n\026AddServerCacheResponse\022\016\n\006return\030\001 \002(\010" + - "\"=\n\030RemoveServerCacheRequest\022\020\n\010tenantId" + - "\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014\"+\n\031RemoveServerC", - "acheResponse\022\016\n\006return\030\001 \002(\0102\245\001\n\024ServerC" + - "achingService\022A\n\016addServerCache\022\026.AddSer" + - "verCacheRequest\032\027.AddServerCacheResponse" + - "\022J\n\021removeServerCache\022\031.RemoveServerCach" + - "eRequest\032\032.RemoveServerCacheResponseBG\n(" + - "org.apache.phoenix.coprocessor.generated" + - "B\023ServerCachingProtosH\001\210\001\001\240\001\001" + "acheFactory\030\004 \002(\0132\023.ServerCacheFactory\022\017" + + "\n\007txState\030\005 \001(\014\"(\n\026AddServerCacheRespons" + + "e\022\016\n\006return\030\001 \002(\010\"=\n\030RemoveServerCacheRe" + + "quest\022\020\n\010tenantId\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014", + "\"+\n\031RemoveServerCacheResponse\022\016\n\006return\030" + + "\001 \002(\0102\245\001\n\024ServerCachingService\022A\n\016addSer" + + "verCache\022\026.AddServerCacheRequest\032\027.AddSe" + + "rverCacheResponse\022J\n\021removeServerCache\022\031" + + ".RemoveServerCacheRequest\032\032.RemoveServer" + + "CacheResponseBG\n(org.apache.phoenix.copr" + + "ocessor.generatedB\023ServerCachingProtosH\001" + + "\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -3414,7 +3508,7 @@ public final class ServerCachingProtos { internal_static_AddServerCacheRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_AddServerCacheRequest_descriptor, - new java.lang.String[] { "TenantId", "CacheId", "CachePtr", "CacheFactory", }); + new java.lang.String[] { "TenantId", "CacheId", "CachePtr", "CacheFactory", "TxState", }); internal_static_AddServerCacheResponse_descriptor = getDescriptor().getMessageTypes().get(2); internal_static_AddServerCacheResponse_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index 94233c8..d96e339 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -66,6 +66,7 @@ import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.TransactionUtil; import org.cloudera.htrace.TraceScope; import com.google.common.collect.Lists; @@ -205,13 +206,15 @@ public abstract class BaseQueryPlan implements QueryPlan { KeyValueSchema schema = ProjectedColumnExpression.buildSchema(dataColumns); // Set key value schema of the data columns. serializeSchemaIntoScan(scan, schema); - String parentSchema = context.getCurrentTable().getTable().getParentSchemaName().getString(); - String parentTable = context.getCurrentTable().getTable().getParentTableName().getString(); + PTable parentTable = context.getCurrentTable().getTable(); + String parentSchemaName = parentTable.getParentSchemaName().getString(); + String parentTableName = parentTable.getParentTableName().getString(); final ParseNodeFactory FACTORY = new ParseNodeFactory(); + // TODO: is it necessary to re-resolve the table? TableRef dataTableRef = FromCompiler.getResolver( - FACTORY.namedTable(null, TableName.create(parentSchema, parentTable)), - context.getConnection()).resolveTable(parentSchema, parentTable); + FACTORY.namedTable(null, TableName.create(parentSchemaName, parentTableName)), + context.getConnection()).resolveTable(parentSchemaName, parentTableName); PTable dataTable = dataTableRef.getTable(); // Set index maintainer of the local index. serializeIndexMaintainerIntoScan(scan, dataTable); @@ -248,7 +251,7 @@ public abstract class BaseQueryPlan implements QueryPlan { return (scope.getSpan() != null) ? new TracingIterator(scope, iterator) : iterator; } - private void serializeIndexMaintainerIntoScan(Scan scan, PTable dataTable) { + private void serializeIndexMaintainerIntoScan(Scan scan, PTable dataTable) throws SQLException { PName name = context.getCurrentTable().getTable().getName(); List<PTable> indexes = Lists.newArrayListWithExpectedSize(1); for (PTable index : dataTable.getIndexes()) { @@ -260,6 +263,10 @@ public abstract class BaseQueryPlan implements QueryPlan { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); IndexMaintainer.serialize(dataTable, ptr, indexes, context.getConnection()); scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr)); + if (dataTable.isTransactional()) { + PhoenixConnection conn = context.getConnection(); + scan.setAttribute(BaseScannerRegionObserver.TX_STATE, TransactionUtil.encodeTxnState(conn.getTransactionContext().getCurrentTransaction())); + } } private void serializeViewConstantsIntoScan(Scan scan, PTable dataTable) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index edaab9e..8915534 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -394,9 +394,10 @@ public class MutationState implements SQLCloseable { if (hasIndexMaintainers && isDataTable) { byte[] attribValue = null; byte[] uuidValue; - if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength())) { + byte[] txState = table.isTransactional() ? TransactionUtil.encodeTxnState(connection.getTransactionContext().getCurrentTransaction()) : ByteUtil.EMPTY_BYTE_ARRAY; + if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength() + txState.length)) { IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); - cache = client.addIndexMetadataCache(mutations, tempPtr); + cache = client.addIndexMetadataCache(mutations, tempPtr, txState); child.addTimelineAnnotation("Updated index metadata cache"); uuidValue = cache.getId(); // If we haven't retried yet, retry for this case only, as it's possible that http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java index 1c9782e..b91a52a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java @@ -68,6 +68,15 @@ public interface IndexBuilder extends Stoppable { * @return a Map of the mutations to make -> target index table name * @throws IOException on failure */ + /* TODO: + Create BaseIndexBuilder with everything except getIndexUpdate(). + Derive two concrete classes: NonTxIndexBuilder and TxIndexBuilder. + NonTxIndexBuilder will be current impl of this method. + TxIndexBuilder will use a scan with skipScan over TxAwareHBase to find the latest values. + Conditionally don't do WALEdit stuff for txnal table (ensure Phoenix/HBase tolerates index WAl edit info not being there) + Noop Failure mode + */ + public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException; /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java index 575779a..32e4d84 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java @@ -32,8 +32,6 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.util.Bytes; - -import com.google.common.collect.Lists; import org.apache.phoenix.hbase.index.covered.KeyValueStore; import org.apache.phoenix.hbase.index.covered.filter.ApplyAndFilterDeletesFilter; import org.apache.phoenix.hbase.index.covered.filter.ColumnTrackingNextLargestTimestampFilter; @@ -41,6 +39,8 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import com.google.common.collect.Lists; + /** * */ @@ -57,6 +57,7 @@ public class ScannerBuilder { public Scanner buildIndexedColumnScanner(Collection<? extends ColumnReference> indexedColumns, ColumnTracker tracker, long ts) { + // TODO: This needs to use some form of the filter that Tephra has when transactional Filter columnFilters = getColumnFilters(indexedColumns); FilterList filters = new FilterList(Lists.newArrayList(columnFilters)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java index 70ddc86..c1135bc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java @@ -67,30 +67,32 @@ public class IndexMetaDataCacheClient { /** * Send the index metadata cahce to all region servers for regions that will handle the mutations. + * @param txState TODO * @return client-side {@link ServerCache} representing the added index metadata cache * @throws SQLException * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed * size */ - public ServerCache addIndexMetadataCache(List<Mutation> mutations, ImmutableBytesWritable ptr) throws SQLException { + public ServerCache addIndexMetadataCache(List<Mutation> mutations, ImmutableBytesWritable ptr, byte[] txState) throws SQLException { /** * Serialize and compress hashCacheTable */ - return serverCache.addServerCache(ScanUtil.newScanRanges(mutations), ptr, new IndexMetaDataCacheFactory(), cacheUsingTableRef); + return serverCache.addServerCache(ScanUtil.newScanRanges(mutations), ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTableRef); } /** * Send the index metadata cahce to all region servers for regions that will handle the mutations. + * @param txState TODO * @return client-side {@link ServerCache} representing the added index metadata cache * @throws SQLException * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed * size */ - public ServerCache addIndexMetadataCache(ScanRanges ranges, ImmutableBytesWritable ptr) throws SQLException { + public ServerCache addIndexMetadataCache(ScanRanges ranges, ImmutableBytesWritable ptr, byte[] txState) throws SQLException { /** * Serialize and compress hashCacheTable */ - return serverCache.addServerCache(ranges, ptr, new IndexMetaDataCacheFactory(), cacheUsingTableRef); + return serverCache.addServerCache(ranges, ptr, txState, new IndexMetaDataCacheFactory(), cacheUsingTableRef); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java index 488db44..8b1ee18 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java @@ -24,11 +24,14 @@ import java.io.IOException; import java.sql.SQLException; import java.util.List; +import co.cask.tephra.Transaction; + import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.cache.IndexMetaDataCache; import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.util.TransactionUtil; public class IndexMetaDataCacheFactory implements ServerCacheFactory { public IndexMetaDataCacheFactory() { @@ -43,10 +46,16 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory { } @Override - public Closeable newCache (ImmutableBytesWritable cachePtr, final MemoryChunk chunk) throws SQLException { + public Closeable newCache (ImmutableBytesWritable cachePtr, byte[] txState, final MemoryChunk chunk) throws SQLException { // just use the standard keyvalue builder - this doesn't really need to be fast - final List<IndexMaintainer> maintainers = + final List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE); + final Transaction txn; + try { + txn = TransactionUtil.decodeTxnState(txState); + } catch (IOException e) { + throw new SQLException(e); + } return new IndexMetaDataCache() { @Override @@ -58,6 +67,11 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory { public List<IndexMaintainer> getIndexMaintainers() { return maintainers; } + + @Override + public Transaction getTransaction() { + return txn; + } }; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java index b89c807..eb117bf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java @@ -58,7 +58,7 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder { for (int i = 0; i < miniBatchOp.size(); i++) { Mutation m = miniBatchOp.getOperation(i); keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow())); - List<IndexMaintainer> indexMaintainers = getCodec().getIndexMaintainers(m.getAttributesMap()); + List<IndexMaintainer> indexMaintainers = getCodec().getIndexMetaData(m.getAttributesMap()).getIndexMaintainers(); for(IndexMaintainer indexMaintainer: indexMaintainers) { if (indexMaintainer.isImmutableRows() && indexMaintainer.isLocalIndex()) continue; http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index 99e26d1..8b507b6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -23,10 +23,10 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import co.cask.tephra.Transaction; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Pair; @@ -34,6 +34,7 @@ import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.IndexMetaDataCache; import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.cache.TenantCache; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.hbase.index.ValueGetter; @@ -50,6 +51,7 @@ import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.TransactionUtil; import com.google.common.collect.Lists; @@ -78,18 +80,47 @@ public class PhoenixIndexCodec extends BaseIndexCodec { this.kvBuilder = GenericKeyValueBuilder.INSTANCE; } - List<IndexMaintainer> getIndexMaintainers(Map<String, byte[]> attributes) throws IOException{ + boolean hasIndexMaintainers(Map<String, byte[]> attributes) { if (attributes == null) { - return Collections.emptyList(); + return false; } byte[] uuid = attributes.get(INDEX_UUID); if (uuid == null) { - return Collections.emptyList(); + return false; + } + return true; + } + + IndexMetaDataCache getIndexMetaData(Map<String, byte[]> attributes) throws IOException{ + if (attributes == null) { + return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; + } + byte[] uuid = attributes.get(INDEX_UUID); + if (uuid == null) { + return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; } byte[] md = attributes.get(INDEX_MD); - List<IndexMaintainer> indexMaintainers; + byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE); if (md != null) { - indexMaintainers = IndexMaintainer.deserialize(md); + final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md); + final Transaction txn = TransactionUtil.decodeTxnState(txState); + return new IndexMetaDataCache() { + + @Override + public void close() throws IOException { + } + + @Override + public List<IndexMaintainer> getIndexMaintainers() { + return indexMaintainers; + } + + @Override + public Transaction getTransaction() { + return txn; + } + + }; } else { byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB); ImmutableBytesWritable tenantId = @@ -103,10 +134,9 @@ public class PhoenixIndexCodec extends BaseIndexCodec { .setMessage(msg).build().buildException(); ServerUtil.throwIOException("Index update failed", e); // will not return } - indexMaintainers = indexCache.getIndexMaintainers(); + return indexCache; } - return indexMaintainers; } @Override @@ -127,7 +157,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec { * @throws IOException */ private Iterable<IndexUpdate> getIndexUpdates(TableState state, boolean upsert) throws IOException { - List<IndexMaintainer> indexMaintainers = getIndexMaintainers(state.getUpdateAttributes()); + IndexMetaDataCache indexCache = getIndexMetaData(state.getUpdateAttributes()); + List<IndexMaintainer> indexMaintainers = indexCache.getIndexMaintainers(); if (indexMaintainers.isEmpty()) { return Collections.emptyList(); } @@ -187,7 +218,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec { @Override public boolean isEnabled(Mutation m) throws IOException { - return !getIndexMaintainers(m.getAttributesMap()).isEmpty(); + return !hasIndexMaintainers(m.getAttributesMap()); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index dffd7c4..a646d96 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -439,12 +439,16 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd public void startTransaction() throws SQLException { if (txContext == null) { + boolean success = false; try { TransactionSystemClient txServiceClient = this.getQueryServices().getTransactionSystemClient(); this.txContext = new TransactionContext(txServiceClient); txContext.start(); + success = true; } catch (TransactionFailureException e) { throw new SQLException(e); // TODO: error code + } finally { + if (!success) endTransaction(); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java index f13b28e..e8b3389 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java @@ -39,6 +39,7 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.TrustedByteArrayOutputStream; import org.apache.phoenix.util.TupleUtil; @@ -80,7 +81,7 @@ public class HashCacheClient { */ ImmutableBytesWritable ptr = new ImmutableBytesWritable(); serialize(ptr, iterator, estimatedSize, onExpressions, singleValueOnly, keyRangeRhsExpression, keyRangeRhsValues); - return serverCache.addServerCache(keyRanges, ptr, new HashCacheFactory(), cacheUsingTableRef); + return serverCache.addServerCache(keyRanges, ptr, ByteUtil.EMPTY_BYTE_ARRAY, new HashCacheFactory(), cacheUsingTableRef); } private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, boolean singleValueOnly, Expression keyRangeRhsExpression, List<Expression> keyRangeRhsValues) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java index 8cae51a..3072736 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java @@ -56,7 +56,7 @@ public class HashCacheFactory implements ServerCacheFactory { } @Override - public Closeable newCache(ImmutableBytesWritable cachePtr, MemoryChunk chunk) throws SQLException { + public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk) throws SQLException { try { // This reads the uncompressed length from the front of the compressed input int uncompressedLen = Snappy.getUncompressedLength(cachePtr.get(), cachePtr.getOffset()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java index 34bba47..6058711 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/TransactionUtil.java @@ -45,12 +45,8 @@ public class TransactionUtil { } } - public static Transaction decodeTxnState(byte[] txnBytes) throws SQLException { - try { - return codec.decode(txnBytes); - } catch (IOException e) { - throw new SQLException(e); - } + public static Transaction decodeTxnState(byte[] txnBytes) throws IOException { + return txnBytes == null ? null : codec.decode(txnBytes); } public static SQLException getSQLException(TransactionFailureException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/826ebf5c/phoenix-protocol/src/main/ServerCachingService.proto ---------------------------------------------------------------------- diff --git a/phoenix-protocol/src/main/ServerCachingService.proto b/phoenix-protocol/src/main/ServerCachingService.proto index 63aeef9..dcbaabd 100644 --- a/phoenix-protocol/src/main/ServerCachingService.proto +++ b/phoenix-protocol/src/main/ServerCachingService.proto @@ -37,6 +37,7 @@ message AddServerCacheRequest { required bytes cacheId = 2; required ImmutableBytesWritable cachePtr = 3; required ServerCacheFactory cacheFactory = 4; + optional bytes txState = 5; } message AddServerCacheResponse {
