http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 16f6e39..09545da 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -221,6 +221,8 @@ import org.apache.phoenix.schema.types.PTinyint; import org.apache.phoenix.schema.types.PUnsignedTinyint; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.ConfigUtil; @@ -235,11 +237,6 @@ import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.UpgradeUtil; -import org.apache.tephra.TransactionSystemClient; -import org.apache.tephra.TxConstants; -import org.apache.tephra.distributed.PooledClientProvider; -import org.apache.tephra.distributed.TransactionServiceClient; -import org.apache.tephra.zookeeper.TephraZKClientService; import org.apache.twill.discovery.ZKDiscoveryService; import org.apache.twill.zookeeper.RetryStrategies; import org.apache.twill.zookeeper.ZKClientService; @@ -291,7 +288,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private HConnection connection; private ZKClientService txZKClientService; - private TransactionServiceClient txServiceClient; private volatile boolean initialized; private volatile int nSequenceSaltBuckets; @@ -400,32 +396,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } - @Override - public TransactionSystemClient getTransactionSystemClient() { - return txServiceClient; - } - private void initTxServiceClient() { - String zkQuorumServersString = this.getProps().get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM); - if (zkQuorumServersString==null) { - zkQuorumServersString = connectionInfo.getZookeeperQuorum()+":"+connectionInfo.getPort(); - } - - int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); - // Create instance of the tephra zookeeper client - txZKClientService = ZKClientServices.delegate( - ZKClients.reWatchOnExpire( - ZKClients.retryOnFailure( - new TephraZKClientService(zkQuorumServersString, timeOut, null, - ArrayListMultimap.<String, byte[]>create()), - RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)) - ) - ); - txZKClientService.startAndWait(); - ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(txZKClientService); - PooledClientProvider pooledClientProvider = new PooledClientProvider( - config, zkDiscoveryService); - this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider); + txZKClientService = TransactionFactory.getTransactionFactory().getTransactionContext().setTransactionClient(config, props, connectionInfo); } private void openConnection() throws SQLException { @@ -868,7 +840,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } boolean isTransactional = Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) || - Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA)); // For ALTER TABLE + Boolean.TRUE.equals(tableProps.get(PhoenixTransactionContext.READ_NON_TX_DATA)); // For ALTER TABLE // TODO: better encapsulation for this // Since indexes can't have indexes, don't install our indexing coprocessor for indexes. // Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use @@ -1130,7 +1102,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // If mapping an existing table as transactional, set property so that existing // data is correctly read. if (willBeTx) { - newDesc.setValue(TxConstants.READ_NON_TX_DATA, Boolean.TRUE.toString()); + newDesc.setValue(PhoenixTransactionContext.READ_NON_TX_DATA, Boolean.TRUE.toString()); } else { // If we think we're creating a non transactional table when it's already // transactional, don't allow. @@ -1139,7 +1111,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement .setSchemaName(SchemaUtil.getSchemaNameFromFullName(tableName)) .setTableName(SchemaUtil.getTableNameFromFullName(tableName)).build().buildException(); } - newDesc.remove(TxConstants.READ_NON_TX_DATA); + newDesc.remove(PhoenixTransactionContext.READ_NON_TX_DATA); } if (existingDesc.equals(newDesc)) { return null; // Indicate that no metadata was changed @@ -1759,7 +1731,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement origTableDescriptors = Sets.newHashSetWithExpectedSize(3 + table.getIndexes().size()); tableDescriptors.add(tableDescriptor); origTableDescriptors.add(origTableDescriptor); - nonTxToTx = Boolean.TRUE.equals(tableProps.get(TxConstants.READ_NON_TX_DATA)); + nonTxToTx = Boolean.TRUE.equals(tableProps.get(PhoenixTransactionContext.READ_NON_TX_DATA)); /* * If the table was transitioned from non transactional to transactional, we need * to also transition the index tables. @@ -1869,7 +1841,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement indexTableProps = Collections.<String,Object>emptyMap(); } else { indexTableProps = Maps.newHashMapWithExpectedSize(1); - indexTableProps.put(TxConstants.READ_NON_TX_DATA, Boolean.valueOf(txValue)); + indexTableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, Boolean.valueOf(txValue)); } for (PTable index : table.getIndexes()) { HTableDescriptor indexDescriptor = admin.getTableDescriptor(index.getPhysicalName().getBytes()); @@ -1882,7 +1854,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(indexFamilyName); HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(dataFamilyName); indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions()); - indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL)); + indexColDescriptor.setValue(PhoenixTransactionContext.PROPERTY_TTL, tableColDescriptor.getValue(PhoenixTransactionContext.PROPERTY_TTL)); } else { for (PColumnFamily family : index.getColumnFamilies()) { byte[] familyName = family.getName().getBytes(); @@ -1890,7 +1862,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(familyName); HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName); indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions()); - indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL)); + indexColDescriptor.setValue(PhoenixTransactionContext.PROPERTY_TTL, tableColDescriptor.getValue(PhoenixTransactionContext.PROPERTY_TTL)); } } setTransactional(indexDescriptor, index.getType(), txValue, indexTableProps); @@ -1926,7 +1898,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement HColumnDescriptor indexColDescriptor = indexDescriptor.getFamily(familyName); HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName); indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions()); - indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL)); + indexColDescriptor.setValue(PhoenixTransactionContext.PROPERTY_TTL, tableColDescriptor.getValue(PhoenixTransactionContext.PROPERTY_TTL)); } else { for (PColumnFamily family : table.getColumnFamilies()) { byte[] familyName = family.getName().getBytes(); @@ -1934,7 +1906,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (indexColDescriptor != null) { HColumnDescriptor tableColDescriptor = tableDescriptor.getFamily(familyName); indexColDescriptor.setMaxVersions(tableColDescriptor.getMaxVersions()); - indexColDescriptor.setValue(TxConstants.PROPERTY_TTL, tableColDescriptor.getValue(TxConstants.PROPERTY_TTL)); + indexColDescriptor.setValue(PhoenixTransactionContext.PROPERTY_TTL, tableColDescriptor.getValue(PhoenixTransactionContext.PROPERTY_TTL)); } } } @@ -1962,9 +1934,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } private void setTransactional(HTableDescriptor tableDescriptor, PTableType tableType, String txValue, Map<String, Object> tableProps) throws SQLException { if (txValue == null) { - tableDescriptor.remove(TxConstants.READ_NON_TX_DATA); + tableDescriptor.remove(PhoenixTransactionContext.READ_NON_TX_DATA); } else { - tableDescriptor.setValue(TxConstants.READ_NON_TX_DATA, txValue); + tableDescriptor.setValue(PhoenixTransactionContext.READ_NON_TX_DATA, txValue); } this.addCoprocessors(tableDescriptor.getName(), tableDescriptor, tableType, tableProps); } @@ -2010,7 +1982,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement commonFamilyProps.put(propName, prop.getSecond()); } else if (propName.equals(PhoenixDatabaseMetaData.TRANSACTIONAL) && Boolean.TRUE.equals(propValue)) { willBeTransactional = isOrWillBeTransactional = true; - tableProps.put(TxConstants.READ_NON_TX_DATA, propValue); + tableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, propValue); } } else { if (MetaDataUtil.isHColumnProperty(propName)) { @@ -2179,10 +2151,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (props == null) { props = new HashMap<String, Object>(); } - props.put(TxConstants.PROPERTY_TTL, ttl); + props.put(PhoenixTransactionContext.PROPERTY_TTL, ttl); // Remove HBase TTL if we're not transitioning an existing table to become transactional // or if the existing transactional table wasn't originally non transactional. - if (!willBeTransactional && !Boolean.valueOf(newTableDescriptor.getValue(TxConstants.READ_NON_TX_DATA))) { + if (!willBeTransactional && !Boolean.valueOf(newTableDescriptor.getValue(PhoenixTransactionContext.READ_NON_TX_DATA))) { props.remove(TTL); } }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index da394c0..ec05b24 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -80,15 +80,13 @@ import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.GuidePostsKey; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.JDBCUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.SequenceUtil; -import org.apache.tephra.TransactionManager; -import org.apache.tephra.TransactionSystemClient; -import org.apache.tephra.inmemory.InMemoryTxSystemClient; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -107,7 +105,6 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple private PMetaData metaData; private final Map<SequenceKey, SequenceInfo> sequenceMap = Maps.newHashMap(); private final String userName; - private final TransactionSystemClient txSystemClient; private KeyValueBuilder kvBuilder; private volatile boolean initialized; private volatile SQLException initializationException; @@ -119,7 +116,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple super(services); userName = connInfo.getPrincipal(); metaData = newEmptyMetaData(); - + // Use KeyValueBuilder that builds real KeyValues, as our test utils require this this.kvBuilder = GenericKeyValueBuilder.INSTANCE; Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); @@ -138,8 +135,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple // Without making a copy of the configuration we cons up, we lose some of our properties // on the server side during testing. this.config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config); - TransactionManager txnManager = new TransactionManager(config); - this.txSystemClient = new InMemoryTxSystemClient(txnManager); + TransactionFactory.getTransactionFactory().getTransactionContext().setInMemoryTransactionClient(config); this.guidePostsCache = new GuidePostsCache(this, config); } @@ -531,11 +527,6 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS); } - @Override - public TransactionSystemClient getTransactionSystemClient() { - return txSystemClient; - } - public MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary) throws SQLException { return new MetaDataMutationResult(MutationCode.FUNCTION_NOT_FOUND, 0l, null); http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index 7f7c027..6c464eb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -47,7 +47,6 @@ import org.apache.phoenix.schema.SequenceAllocation; import org.apache.phoenix.schema.SequenceKey; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.GuidePostsKey; -import org.apache.tephra.TransactionSystemClient; public class DelegateConnectionQueryServices extends DelegateQueryServices implements ConnectionQueryServices { @@ -257,11 +256,6 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple } @Override - public TransactionSystemClient getTransactionSystemClient() { - return getDelegate().getTransactionSystemClient(); - } - - @Override public MetaDataMutationResult createFunction(List<Mutation> functionData, PFunction function, boolean temporary) throws SQLException { return getDelegate().createFunction(functionData, function, temporary); http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 537d31b..c982e26 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -217,6 +217,7 @@ import org.apache.phoenix.schema.types.PTimestamp; import org.apache.phoenix.schema.types.PUnsignedLong; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.CursorUtil; import org.apache.phoenix.util.EncodedColumnsUtil; @@ -231,7 +232,6 @@ import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.StringUtil; import org.apache.phoenix.util.TransactionUtil; import org.apache.phoenix.util.UpgradeUtil; -import org.apache.tephra.TxConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1989,7 +1989,7 @@ public class MetaDataClient { // If TTL set, use Tephra TTL property name instead Object ttl = commonFamilyProps.remove(HColumnDescriptor.TTL); if (ttl != null) { - commonFamilyProps.put(TxConstants.PROPERTY_TTL, ttl); + commonFamilyProps.put(PhoenixTransactionContext.PROPERTY_TTL, ttl); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index b0d0e25..20e8611 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -69,13 +69,13 @@ import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PDouble; import org.apache.phoenix.schema.types.PFloat; import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.SizedUtil; import org.apache.phoenix.util.TrustedByteArrayOutputStream; -import org.apache.tephra.TxConstants; import com.google.common.base.Objects; import com.google.common.base.Preconditions; @@ -1033,11 +1033,11 @@ public class PTableImpl implements PTable { if (PTableImpl.this.isTransactional()) { Put put = new Put(key); if (families.isEmpty()) { - put.add(SchemaUtil.getEmptyColumnFamily(PTableImpl.this), TxConstants.FAMILY_DELETE_QUALIFIER, ts, + put.add(SchemaUtil.getEmptyColumnFamily(PTableImpl.this), TransactionFactory.getTransactionFactory().getTransactionContext().getFamilyDeleteMarker(), ts, HConstants.EMPTY_BYTE_ARRAY); } else { for (PColumnFamily colFamily : families) { - put.add(colFamily.getName().getBytes(), TxConstants.FAMILY_DELETE_QUALIFIER, ts, + put.add(colFamily.getName().getBytes(), TransactionFactory.getTransactionFactory().getTransactionContext().getFamilyDeleteMarker(), ts, HConstants.EMPTY_BYTE_ARRAY); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java new file mode 100644 index 0000000..d4553ec --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.transaction; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.twill.zookeeper.ZKClientService; +import org.slf4j.Logger; + +public class OmidTransactionContext implements PhoenixTransactionContext { + + @Override + public void begin() throws SQLException { + // TODO Auto-generated method stub + + } + + @Override + public void commit() throws SQLException { + // TODO Auto-generated method stub + + } + + @Override + public void abort() throws SQLException { + // TODO Auto-generated method stub + + } + + @Override + public void checkpoint(boolean hasUncommittedData) throws SQLException { + // TODO Auto-generated method stub + + } + + @Override + public void commitDDLFence(PTable dataTable, Logger logger) throws SQLException { + // TODO Auto-generated method stub + + } + + @Override + public void join(PhoenixTransactionContext ctx) { + // TODO Auto-generated method stub + + } + + @Override + public boolean isTransactionRunning() { + // TODO Auto-generated method stub + return false; + } + + @Override + public void reset() { + // TODO Auto-generated method stub + + } + + @Override + public long getTransactionId() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public long getReadPointer() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public long getWritePointer() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public PhoenixVisibilityLevel getVisibilityLevel() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel) { + // TODO Auto-generated method stub + + } + + @Override + public byte[] encodeTransaction() throws SQLException { + // TODO Auto-generated method stub + return null; + } + + @Override + public long getMaxTransactionsPerSecond() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public boolean isPreExistingVersion(long version) { + // TODO Auto-generated method stub + return false; + } + + @Override + public BaseRegionObserver getCoProcessor() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setInMemoryTransactionClient(Configuration config) { + // TODO Auto-generated method stub + + } + + @Override + public ZKClientService setTransactionClient(Configuration config, ReadOnlyProps props, + ConnectionInfo connectionInfo) { + // TODO Auto-generated method stub + + return null; + + } + + @Override + public byte[] getFamilyDeleteMarker() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void setupTxManager(Configuration config, String url) throws SQLException { + // TODO Auto-generated method stub + + } + + @Override + public void tearDownTxManager() { + // TODO Auto-generated method stub + + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java new file mode 100644 index 0000000..54eea8b --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.transaction; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; + +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; + +public class OmidTransactionTable implements PhoenixTransactionalTable { + + public OmidTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) { + // TODO Auto-generated constructor stub + } + + @Override + public Result get(Get get) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void put(Put put) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void delete(Delete delete) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public byte[] getTableName() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Configuration getConfiguration() { + // TODO Auto-generated method stub + return null; + } + + @Override + public HTableDescriptor getTableDescriptor() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean exists(Get get) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public Result[] get(List<Get> gets) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public ResultScanner getScanner(byte[] family) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public ResultScanner getScanner(byte[] family, byte[] qualifier) + throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void put(List<Put> puts) throws IOException { + // TODO Auto-generated method stub + } + + @Override + public void delete(List<Delete> deletes) throws IOException { + // TODO Auto-generated method stub + } + + @Override + public void setAutoFlush(boolean autoFlush) { + // TODO Auto-generated method stub + } + + @Override + public boolean isAutoFlush() { + // TODO Auto-generated method stub + return false; + } + + @Override + public long getWriteBufferSize() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void setWriteBufferSize(long writeBufferSize) throws IOException { + // TODO Auto-generated method stub + } + + @Override + public void flushCommits() throws IOException { + // TODO Auto-generated method stub + } + + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount, boolean writeToWAL) + throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public Boolean[] exists(List<Get> gets) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { + // TODO Auto-generated method stub + } + + @Override + public void setAutoFlushTo(boolean autoFlush) { + // TODO Auto-generated method stub + } + + @Override + public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public TableName getName() { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean[] existsAll(List<Get> gets) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void batch(List<? extends Row> actions, Object[] results) + throws IOException, InterruptedException { + // TODO Auto-generated method stub + } + + @Override + public Object[] batch(List<? extends Row> actions) throws IOException, + InterruptedException { + // TODO Auto-generated method stub + return null; + } + + @Override + public <R> void batchCallback(List<? extends Row> actions, + Object[] results, Callback<R> callback) throws IOException, + InterruptedException { + // TODO Auto-generated method stub + } + + @Override + public <R> Object[] batchCallback(List<? extends Row> actions, + Callback<R> callback) throws IOException, InterruptedException { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Put put) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Put put) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Delete delete) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Delete delete) + throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public void mutateRow(RowMutations rm) throws IOException { + // TODO Auto-generated method stub + } + + @Override + public Result append(Append append) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Result increment(Increment increment) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount) throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount, Durability durability) + throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public CoprocessorRpcChannel coprocessorService(byte[] row) { + // TODO Auto-generated method stub + return null; + } + + @Override + public <T extends Service, R> Map<byte[], R> coprocessorService( + Class<T> service, byte[] startKey, byte[] endKey, + Call<T, R> callable) throws ServiceException, Throwable { + // TODO Auto-generated method stub + return null; + } + + @Override + public <T extends Service, R> void coprocessorService(Class<T> service, + byte[] startKey, byte[] endKey, Call<T, R> callable, + Callback<R> callback) throws ServiceException, Throwable { + // TODO Auto-generated method stub + } + + @Override + public <R extends Message> Map<byte[], R> batchCoprocessorService( + MethodDescriptor methodDescriptor, Message request, + byte[] startKey, byte[] endKey, R responsePrototype) + throws ServiceException, Throwable { + // TODO Auto-generated method stub + return null; + } + + @Override + public <R extends Message> void batchCoprocessorService( + MethodDescriptor methodDescriptor, Message request, + byte[] startKey, byte[] endKey, R responsePrototype, + Callback<R> callback) throws ServiceException, Throwable { + // TODO Auto-generated method stub + } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, RowMutations mutation) + throws IOException { + // TODO Auto-generated method stub + return false; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java new file mode 100644 index 0000000..d335692 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.transaction; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.twill.zookeeper.ZKClientService; +import org.slf4j.Logger; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.concurrent.TimeoutException; + +public interface PhoenixTransactionContext { + + /** + * + * Visibility levels needed for checkpointing and + * + */ + public enum PhoenixVisibilityLevel { + SNAPSHOT, + SNAPSHOT_EXCLUDE_CURRENT, + SNAPSHOT_ALL + } + + public static final String TX_ROLLBACK_ATTRIBUTE_KEY = "tephra.tx.rollback"; //"phoenix.tx.rollback"; + + public static final String PROPERTY_TTL = "dataset.table.ttl"; + + public static final String READ_NON_TX_DATA = "data.tx.read.pre.existing"; + + /** + * Set the in memory client connection to the transaction manager (for testing purpose) + * + * @param config + */ + public void setInMemoryTransactionClient(Configuration config); + + /** + * Set the client connection to the transaction manager + * + * @param config + * @param props + * @param connectionInfo + */ + public ZKClientService setTransactionClient(Configuration config, ReadOnlyProps props, ConnectionInfo connectionInfo); + + /** + * Starts a transaction + * + * @throws SQLException + */ + public void begin() throws SQLException; + + /** + * Commits a transaction + * + * @throws SQLException + */ + public void commit() throws SQLException; + + /** + * Rollback a transaction + * + * @throws SQLException + */ + public void abort() throws SQLException; + + /** + * Create a checkpoint in a transaction as defined in [TEPHRA-96] + * @throws SQLException + */ + public void checkpoint(boolean hasUncommittedData) throws SQLException; + + /** + * Commit DDL to guarantee that no transaction started before create index + * and committed afterwards, as explained in [PHOENIX-2478], [TEPHRA-157] and [OMID-56]. + * + * @param dataTable the table that the DDL command works on + * @throws SQLException + * @throws InterruptedException + * @throws TimeoutException + */ + public void commitDDLFence(PTable dataTable, Logger logger) + throws SQLException; + + /** + * Augment the current context with ctx modified keys + * + * @param ctx + */ + public void join(PhoenixTransactionContext ctx); + + /** + * Is there a transaction in flight? + */ + public boolean isTransactionRunning(); + + /** + * Reset transaction state + */ + public void reset(); + + /** + * Returns transaction unique identifier + */ + public long getTransactionId(); + + /** + * Returns transaction snapshot id + */ + public long getReadPointer(); + + /** + * Returns transaction write pointer. After checkpoint the write pointer is different than the initial one + */ + public long getWritePointer(); + + /** + * Set visibility level + */ + public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel); + + /** + * Returns visibility level + */ + public PhoenixVisibilityLevel getVisibilityLevel(); + + /** + * Encode transaction + */ + public byte[] encodeTransaction() throws SQLException; + + /** + * + * @return max transactions per second + */ + public long getMaxTransactionsPerSecond(); + + /** + * + * @param version + */ + public boolean isPreExistingVersion(long version); + + /** + * + * @return the coprocessor + */ + public BaseRegionObserver getCoProcessor(); + + /** + * + * @return the family delete marker + */ + public byte[] getFamilyDeleteMarker(); + + /** + * Setup transaction manager's configuration for testing + */ + public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException; + + /** + * Setup transaction manager for testing + */ + public void setupTxManager(Configuration config, String url) throws SQLException; + + /** + * Tear down transaction manager for testing + */ + public void tearDownTxManager(); +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java new file mode 100644 index 0000000..7af1c08 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.transaction; + +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HTableDescriptor; + +import java.io.IOException; +import java.util.List; + +public interface PhoenixTransactionalTable extends HTableInterface { + + /** + * Transaction version of {@link HTableInterface#get(Get get)} + * @param get + * @throws IOException + */ + public Result get(Get get) throws IOException; + + /** + * Transactional version of {@link HTableInterface#put(Put put)} + * @param put + * @throws IOException + */ + public void put(Put put) throws IOException; + + /** + * Transactional version of {@link HTableInterface#delete(Delete delete)} + * + * @param delete + * @throws IOException + */ + public void delete(Delete delete) throws IOException; + + /** + * Transactional version of {@link HTableInterface#getScanner(Scan scan)} + * + * @param scan + * @return ResultScanner + * @throws IOException + */ + public ResultScanner getScanner(Scan scan) throws IOException; + + /** + * Returns Htable name + */ + public byte[] getTableName(); + + /** + * Returns Htable configuration object + */ + public Configuration getConfiguration(); + + /** + * Returns HTableDescriptor of Htable + * @throws IOException + */ + public HTableDescriptor getTableDescriptor() throws IOException; + + /** + * Checks if cell exists + * @throws IOException + */ + public boolean exists(Get get) throws IOException; + + /** + * Transactional version of {@link HTableInterface#get(List gets)} + * @throws IOException + */ + public Result[] get(List<Get> gets) throws IOException; + + /** + * Transactional version of {@link HTableInterface#getScanner(byte[] family)} + * @throws IOException + */ + public ResultScanner getScanner(byte[] family) throws IOException; + + /** + * Transactional version of {@link HTableInterface#getScanner(byte[] family, byte[] qualifier)} + * @throws IOException + */ + public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException; + + /** + * Transactional version of {@link HTableInterface#put(List puts)} + * @throws IOException + */ + public void put(List<Put> puts) throws IOException; + + /** + * Transactional version of {@link HTableInterface#delete(List deletes)} + * @throws IOException + */ + public void delete(List<Delete> deletes) throws IOException; + + /** + * Delegates to {@link HTable#setAutoFlush(boolean autoFlush)} + */ + public void setAutoFlush(boolean autoFlush); + + /** + * Delegates to {@link HTable#isAutoFlush()} + */ + public boolean isAutoFlush(); + + /** + * Delegates to see HTable.getWriteBufferSize() + */ + public long getWriteBufferSize(); + + /** + * Delegates to see HTable.setWriteBufferSize() + */ + public void setWriteBufferSize(long writeBufferSize) throws IOException; + + /** + * Delegates to see HTable.flushCommits() + */ + public void flushCommits() throws IOException; + + /** + * Releases resources + * @throws IOException + */ + public void close() throws IOException; +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java new file mode 100644 index 0000000..83e7f99 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.transaction; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionAware; +import org.apache.tephra.TransactionCodec; +import org.apache.tephra.TransactionConflictException; +import org.apache.tephra.TransactionContext; +import org.apache.tephra.TransactionFailureException; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TransactionSystemClient; +import org.apache.tephra.Transaction.VisibilityLevel; +import org.apache.tephra.TxConstants; +import org.apache.tephra.distributed.PooledClientProvider; +import org.apache.tephra.distributed.TransactionServiceClient; +import org.apache.tephra.hbase.coprocessor.TransactionProcessor; +import org.apache.tephra.inmemory.InMemoryTxSystemClient; +import org.apache.tephra.util.TxUtils; +import org.apache.tephra.visibility.FenceWait; +import org.apache.tephra.visibility.VisibilityFence; +import org.apache.tephra.zookeeper.TephraZKClientService; +import org.apache.tephra.distributed.TransactionService; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.HDFSTransactionStateStorage; +import org.apache.tephra.snapshot.SnapshotCodecProvider; +import org.apache.twill.discovery.DiscoveryService; +import org.apache.twill.discovery.ZKDiscoveryService; +import org.apache.twill.internal.utils.Networks; +import org.apache.twill.zookeeper.RetryStrategies; +import org.apache.twill.zookeeper.ZKClientService; +import org.apache.twill.zookeeper.ZKClientServices; +import org.apache.twill.zookeeper.ZKClients; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Lists; +import com.google.inject.util.Providers; + +import org.slf4j.Logger; + +public class TephraTransactionContext implements PhoenixTransactionContext { + + private static final TransactionCodec CODEC = new TransactionCodec(); + + private static TransactionSystemClient txClient = null; + private static ZKClientService zkClient = null; + private static TransactionService txService = null; + private static TransactionManager txManager = null; + + private final List<TransactionAware> txAwares; + private final TransactionContext txContext; + private Transaction tx; + private TransactionSystemClient txServiceClient; + private TransactionFailureException e; + + public TephraTransactionContext() { + this.txServiceClient = null; + this.txAwares = Lists.newArrayList(); + this.txContext = null; + } + + public TephraTransactionContext(byte[] txnBytes) throws IOException { + this(); + this.tx = (txnBytes != null && txnBytes.length > 0) ? CODEC + .decode(txnBytes) : null; + } + + public TephraTransactionContext(PhoenixConnection connection) { + this.txServiceClient = txClient; + this.txAwares = Collections.emptyList(); + this.txContext = new TransactionContext(txServiceClient); + } + + public TephraTransactionContext(PhoenixTransactionContext ctx, + PhoenixConnection connection, boolean subTask) { + this.txServiceClient = txClient; + assert (ctx instanceof TephraTransactionContext); + TephraTransactionContext tephraTransactionContext = (TephraTransactionContext) ctx; + + if (subTask) { + this.tx = tephraTransactionContext.getTransaction(); + this.txAwares = Lists.newArrayList(); + this.txContext = null; + } else { + this.txAwares = Collections.emptyList(); + this.txContext = tephraTransactionContext.getContext(); + } + + this.e = null; + } + + @Override + public void setInMemoryTransactionClient(Configuration config) { + TransactionManager txnManager = new TransactionManager(config); + txClient = this.txServiceClient = new InMemoryTxSystemClient(txnManager); + } + + @Override + public ZKClientService setTransactionClient(Configuration config, ReadOnlyProps props, ConnectionInfo connectionInfo) { + String zkQuorumServersString = props.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM); + if (zkQuorumServersString==null) { + zkQuorumServersString = connectionInfo.getZookeeperQuorum()+":"+connectionInfo.getPort(); + } + + int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); + // Create instance of the tephra zookeeper client + ZKClientService txZKClientService = ZKClientServices.delegate( + ZKClients.reWatchOnExpire( + ZKClients.retryOnFailure( + new TephraZKClientService(zkQuorumServersString, timeOut, null, + ArrayListMultimap.<String, byte[]>create()), + RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)) + ) + ); + txZKClientService.startAndWait(); + ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(txZKClientService); + PooledClientProvider pooledClientProvider = new PooledClientProvider( + config, zkDiscoveryService); + txClient = this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider); + + return txZKClientService; + } + + @Override + public void begin() throws SQLException { + if (txContext == null) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build() + .buildException(); + } + + try { + txContext.start(); + } catch (TransactionFailureException e) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TRANSACTION_FAILED) + .setMessage(e.getMessage()).setRootCause(e).build() + .buildException(); + } + } + + @Override + public void commit() throws SQLException { + + if (txContext == null || !isTransactionRunning()) { + return; + } + + try { + txContext.finish(); + } catch (TransactionFailureException e) { + this.e = e; + + if (e instanceof TransactionConflictException) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION) + .setMessage(e.getMessage()).setRootCause(e).build() + .buildException(); + } + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TRANSACTION_FAILED) + .setMessage(e.getMessage()).setRootCause(e).build() + .buildException(); + } + } + + @Override + public void abort() throws SQLException { + + if (txContext == null || !isTransactionRunning()) { + return; + } + + try { + if (e != null) { + txContext.abort(e); + e = null; + } else { + txContext.abort(); + } + } catch (TransactionFailureException e) { + this.e = null; + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TRANSACTION_FAILED) + .setMessage(e.getMessage()).setRootCause(e).build() + .buildException(); + } + } + + @Override + public void checkpoint(boolean hasUncommittedData) throws SQLException { + if (hasUncommittedData) { + try { + if (txContext == null) { + tx = txServiceClient.checkpoint(tx); + } else { + assert (txContext != null); + txContext.checkpoint(); + tx = txContext.getCurrentTransaction(); + } + } catch (TransactionFailureException e) { + throw new SQLException(e); + } + } + + // Since we're querying our own table while mutating it, we must exclude + // see our current mutations, otherwise we can get erroneous results + // (for DELETE) + // or get into an infinite loop (for UPSERT SELECT). + if (txContext == null) { + tx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); + } else { + assert (txContext != null); + txContext.getCurrentTransaction().setVisibility( + VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); + } + } + + @Override + public void commitDDLFence(PTable dataTable, Logger logger) + throws SQLException { + byte[] key = dataTable.getName().getBytes(); + + try { + FenceWait fenceWait = VisibilityFence.prepareWait(key, + txServiceClient); + fenceWait.await(10000, TimeUnit.MILLISECONDS); + + if (logger.isInfoEnabled()) { + logger.info("Added write fence at ~" + + getCurrentTransaction().getReadPointer()); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e) + .build().buildException(); + } catch (TimeoutException | TransactionFailureException e) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE) + .setSchemaName(dataTable.getSchemaName().getString()) + .setTableName(dataTable.getTableName().getString()).build() + .buildException(); + } + } + + public void markDMLFence(PTable table) { + byte[] logicalKey = table.getName().getBytes(); + TransactionAware logicalTxAware = VisibilityFence.create(logicalKey); + + if (this.txContext == null) { + this.txAwares.add(logicalTxAware); + } else { + this.txContext.addTransactionAware(logicalTxAware); + } + + byte[] physicalKey = table.getPhysicalName().getBytes(); + if (Bytes.compareTo(physicalKey, logicalKey) != 0) { + TransactionAware physicalTxAware = VisibilityFence + .create(physicalKey); + if (this.txContext == null) { + this.txAwares.add(physicalTxAware); + } else { + this.txContext.addTransactionAware(physicalTxAware); + } + } + } + + @Override + public void join(PhoenixTransactionContext ctx) { + assert (ctx instanceof TephraTransactionContext); + TephraTransactionContext tephraContext = (TephraTransactionContext) ctx; + + if (txContext != null) { + for (TransactionAware txAware : tephraContext.getAwares()) { + txContext.addTransactionAware(txAware); + } + } else { + txAwares.addAll(tephraContext.getAwares()); + } + } + + private Transaction getCurrentTransaction() { + return tx != null ? tx : txContext != null ? txContext.getCurrentTransaction() : null; + } + + @Override + public boolean isTransactionRunning() { + return getCurrentTransaction() != null; + } + + @Override + public void reset() { + tx = null; + txAwares.clear(); + this.e = null; + } + + @Override + public long getTransactionId() { + Transaction tx = getCurrentTransaction(); + return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getTransactionId(); // First write pointer - won't change with checkpointing + } + + @Override + public long getReadPointer() { + Transaction tx = getCurrentTransaction(); + + if (tx == null) { + return (-1); + } + + return tx.getReadPointer(); + } + + // For testing + @Override + public long getWritePointer() { + Transaction tx = getCurrentTransaction(); + return tx == null ? HConstants.LATEST_TIMESTAMP : tx.getWritePointer(); + } + + @Override + public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel) { + VisibilityLevel tephraVisibilityLevel = null; + + switch (visibilityLevel) { + case SNAPSHOT: + tephraVisibilityLevel = VisibilityLevel.SNAPSHOT; + break; + case SNAPSHOT_EXCLUDE_CURRENT: + tephraVisibilityLevel = VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT; + break; + case SNAPSHOT_ALL: + System.out.println("OHAD Move to SNAPSHOT_ALL "); + System.out.flush(); + tephraVisibilityLevel = VisibilityLevel.SNAPSHOT_ALL; + break; + default: + assert (false); + } + + Transaction tx = getCurrentTransaction(); + assert(tx != null); + tx.setVisibility(tephraVisibilityLevel); + } + + @Override + public PhoenixVisibilityLevel getVisibilityLevel() { + VisibilityLevel visibilityLevel = null; + + Transaction tx = getCurrentTransaction(); + assert(tx != null); + visibilityLevel = tx.getVisibilityLevel(); + + PhoenixVisibilityLevel phoenixVisibilityLevel; + switch (visibilityLevel) { + case SNAPSHOT: + phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT; + break; + case SNAPSHOT_EXCLUDE_CURRENT: + phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT; + break; + case SNAPSHOT_ALL: + phoenixVisibilityLevel = PhoenixVisibilityLevel.SNAPSHOT_ALL; + default: + phoenixVisibilityLevel = null; + } + + return phoenixVisibilityLevel; + } + + @Override + public byte[] encodeTransaction() throws SQLException { + Transaction tx = getCurrentTransaction(); + assert (tx != null); + + try { + return CODEC.encode(tx); + } catch (IOException e) { + throw new SQLException(e); + } + } + + @Override + public long getMaxTransactionsPerSecond() { + return TxConstants.MAX_TX_PER_MS; + } + + @Override + public boolean isPreExistingVersion(long version) { + return TxUtils.isPreExistingVersion(version); + } + + @Override + public BaseRegionObserver getCoProcessor() { + return new TransactionProcessor(); + } + + @Override + public byte[] getFamilyDeleteMarker() { + return TxConstants.FAMILY_DELETE_QUALIFIER; + } + + @Override + public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException { + config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false); + config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times"); + config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1); + config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, Networks.getRandomPort()); + config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder); + config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, defaultTxnTimeoutSeconds); + config.unset(TxConstants.Manager.CFG_TX_HDFS_USER); + config.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L); + } + + @Override + public void setupTxManager(Configuration config, String url) throws SQLException { + + if (txService != null) { + return; + } + + ConnectionInfo connInfo = ConnectionInfo.create(url); + zkClient = ZKClientServices.delegate( + ZKClients.reWatchOnExpire( + ZKClients.retryOnFailure( + ZKClientService.Builder.of(connInfo.getZookeeperConnectionString()) + .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, + HConstants.DEFAULT_ZK_SESSION_TIMEOUT)) + .build(), + RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS) + ) + ) + ); + zkClient.startAndWait(); + + DiscoveryService discovery = new ZKDiscoveryService(zkClient); + txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector()); + txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager)); + txService.startAndWait(); + } + + @Override + public void tearDownTxManager() { + try { + if (txService != null) txService.stopAndWait(); + } finally { + try { + if (zkClient != null) zkClient.stopAndWait(); + } finally { + txService = null; + zkClient = null; + txManager = null; + } + } + } + + /** + * TephraTransactionContext specific functions + */ + + Transaction getTransaction() { + return this.getCurrentTransaction(); + } + + TransactionContext getContext() { + return this.txContext; + } + + List<TransactionAware> getAwares() { + return txAwares; + } + + void addTransactionAware(TransactionAware txAware) { + if (this.txContext != null) { + txContext.addTransactionAware(txAware); + } else if (this.tx != null) { + txAwares.add(txAware); + assert (tx != null); + txAware.startTx(tx); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java new file mode 100644 index 0000000..cf48521 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.transaction; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.tephra.TxConstants; +import org.apache.tephra.hbase.TransactionAwareHTable; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; + +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; + +public class TephraTransactionTable implements PhoenixTransactionalTable { + + private TransactionAwareHTable transactionAwareHTable; + + private TephraTransactionContext tephraTransactionContext; + + public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) { + this(ctx, hTable, null); + } + + public TephraTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable, PTable pTable) { + + assert(ctx instanceof TephraTransactionContext); + + tephraTransactionContext = (TephraTransactionContext) ctx; + + transactionAwareHTable = new TransactionAwareHTable(hTable, (pTable != null && pTable.isImmutableRows()) ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW); + + tephraTransactionContext.addTransactionAware(transactionAwareHTable); + + if (pTable != null && pTable.getType() != PTableType.INDEX) { + tephraTransactionContext.markDMLFence(pTable); + } + } + + @Override + public Result get(Get get) throws IOException { + return transactionAwareHTable.get(get); + } + + @Override + public void put(Put put) throws IOException { + transactionAwareHTable.put(put); + } + + @Override + public void delete(Delete delete) throws IOException { + transactionAwareHTable.delete(delete); + } + + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + return transactionAwareHTable.getScanner(scan); + } + + @Override + public byte[] getTableName() { + return transactionAwareHTable.getTableName(); + } + + @Override + public Configuration getConfiguration() { + return transactionAwareHTable.getConfiguration(); + } + + @Override + public HTableDescriptor getTableDescriptor() throws IOException { + return transactionAwareHTable.getTableDescriptor(); + } + + @Override + public boolean exists(Get get) throws IOException { + return transactionAwareHTable.exists(get); + } + + @Override + public Result[] get(List<Get> gets) throws IOException { + return transactionAwareHTable.get(gets); + } + + @Override + public ResultScanner getScanner(byte[] family) throws IOException { + return transactionAwareHTable.getScanner(family); + } + + @Override + public ResultScanner getScanner(byte[] family, byte[] qualifier) + throws IOException { + return transactionAwareHTable.getScanner(family, qualifier); + } + + @Override + public void put(List<Put> puts) throws IOException { + transactionAwareHTable.put(puts); + } + + @Override + public void delete(List<Delete> deletes) throws IOException { + transactionAwareHTable.delete(deletes); + } + + @Override + public void setAutoFlush(boolean autoFlush) { + transactionAwareHTable.setAutoFlush(autoFlush); + } + + @Override + public boolean isAutoFlush() { + return transactionAwareHTable.isAutoFlush(); + } + + @Override + public long getWriteBufferSize() { + return transactionAwareHTable.getWriteBufferSize(); + } + + @Override + public void setWriteBufferSize(long writeBufferSize) throws IOException { + transactionAwareHTable.setWriteBufferSize(writeBufferSize); + } + + @Override + public void flushCommits() throws IOException { + transactionAwareHTable.flushCommits(); + } + + @Override + public void close() throws IOException { + transactionAwareHTable.close(); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount, boolean writeToWAL) + throws IOException { + return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL); + } + + @Override + public Boolean[] exists(List<Get> gets) throws IOException { + return transactionAwareHTable.exists(gets); + } + + @Override + public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { + transactionAwareHTable.setAutoFlush(autoFlush, clearBufferOnFail); + } + + @Override + public void setAutoFlushTo(boolean autoFlush) { + transactionAwareHTable.setAutoFlush(autoFlush); + } + + @Override + public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { + return transactionAwareHTable.getRowOrBefore(row, family); + } + + @Override + public TableName getName() { + return transactionAwareHTable.getName(); + } + + @Override + public boolean[] existsAll(List<Get> gets) throws IOException { + return transactionAwareHTable.existsAll(gets); + } + + @Override + public void batch(List<? extends Row> actions, Object[] results) + throws IOException, InterruptedException { + transactionAwareHTable.batch(actions, results); + } + + @Override + public Object[] batch(List<? extends Row> actions) throws IOException, + InterruptedException { + return transactionAwareHTable.batch(actions); + } + + @Override + public <R> void batchCallback(List<? extends Row> actions, + Object[] results, Callback<R> callback) throws IOException, + InterruptedException { + transactionAwareHTable.batchCallback(actions, results, callback); + } + + @Override + public <R> Object[] batchCallback(List<? extends Row> actions, + Callback<R> callback) throws IOException, InterruptedException { + return transactionAwareHTable.batchCallback(actions, callback); + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Put put) throws IOException { + return transactionAwareHTable.checkAndPut(row, family, qualifier, value, put); + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Put put) throws IOException { + return transactionAwareHTable.checkAndPut(row, family, qualifier, compareOp, value, put); + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Delete delete) throws IOException { + return transactionAwareHTable.checkAndDelete(row, family, qualifier, value, delete); + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Delete delete) + throws IOException { + return transactionAwareHTable.checkAndDelete(row, family, qualifier, compareOp, value, delete); + } + + @Override + public void mutateRow(RowMutations rm) throws IOException { + transactionAwareHTable.mutateRow(rm); + } + + @Override + public Result append(Append append) throws IOException { + return transactionAwareHTable.append(append); + } + + @Override + public Result increment(Increment increment) throws IOException { + return transactionAwareHTable.increment(increment); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount) throws IOException { + return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount, Durability durability) + throws IOException { + return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount, durability); + } + + @Override + public CoprocessorRpcChannel coprocessorService(byte[] row) { + return transactionAwareHTable.coprocessorService(row); + } + + @Override + public <T extends Service, R> Map<byte[], R> coprocessorService( + Class<T> service, byte[] startKey, byte[] endKey, + Call<T, R> callable) throws ServiceException, Throwable { + return transactionAwareHTable.coprocessorService(service, startKey, endKey, callable); + } + + @Override + public <T extends Service, R> void coprocessorService(Class<T> service, + byte[] startKey, byte[] endKey, Call<T, R> callable, + Callback<R> callback) throws ServiceException, Throwable { + transactionAwareHTable.coprocessorService(service, startKey, endKey, callable, callback); + } + + @Override + public <R extends Message> Map<byte[], R> batchCoprocessorService( + MethodDescriptor methodDescriptor, Message request, + byte[] startKey, byte[] endKey, R responsePrototype) + throws ServiceException, Throwable { + return transactionAwareHTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype); + } + + @Override + public <R extends Message> void batchCoprocessorService( + MethodDescriptor methodDescriptor, Message request, + byte[] startKey, byte[] endKey, R responsePrototype, + Callback<R> callback) throws ServiceException, Throwable { + transactionAwareHTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, callback); + } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, RowMutations mutation) + throws IOException { + return transactionAwareHTable.checkAndMutate(row, family, qualifier, compareOp, value, mutation); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java new file mode 100644 index 0000000..8b3fc1d --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.transaction; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.phoenix.jdbc.PhoenixConnection; + +public class TransactionFactory { + + static private TransactionFactory transactionFactory = null; + + private TransactionProcessor tp = TransactionProcessor.Tephra; + + enum TransactionProcessor { + Tephra, + Omid + } + + private TransactionFactory(TransactionProcessor tp) { + this.tp = tp; + } + + static public void createTransactionFactory(TransactionProcessor tp) { + if (transactionFactory == null) { + transactionFactory = new TransactionFactory(tp); + } + } + + static public TransactionFactory getTransactionFactory() { + if (transactionFactory == null) { + createTransactionFactory(TransactionProcessor.Tephra); + } + + return transactionFactory; + } + + public PhoenixTransactionContext getTransactionContext() { + + PhoenixTransactionContext ctx = null; + + switch(tp) { + case Tephra: + ctx = new TephraTransactionContext(); + break; + case Omid: + ctx = new OmidTransactionContext(); + break; + default: + ctx = null; + } + + return ctx; + } + + public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException { + + PhoenixTransactionContext ctx = null; + + switch(tp) { + case Tephra: + ctx = new TephraTransactionContext(txnBytes); + break; + case Omid: +// ctx = new OmidTransactionContext(txnBytes); + break; + default: + ctx = null; + } + + return ctx; + } + + public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) { + + PhoenixTransactionContext ctx = null; + + switch(tp) { + case Tephra: + ctx = new TephraTransactionContext(connection); + break; + case Omid: +// ctx = new OmidTransactionContext(connection); + break; + default: + ctx = null; + } + + return ctx; + } + + public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask) { + + PhoenixTransactionContext ctx = null; + + switch(tp) { + case Tephra: + ctx = new TephraTransactionContext(contex, connection, subTask); + break; + case Omid: +// ctx = new OmidTransactionContext(contex, connection, subTask); + break; + default: + ctx = null; + } + + return ctx; + } + + public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable) { + + PhoenixTransactionalTable table = null; + + switch(tp) { + case Tephra: + table = new TephraTransactionTable(ctx, htable); + break; + case Omid: +// table = new OmidTransactionContext(contex, connection, subTask); + break; + default: + table = null; + } + + return table; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 2f65647..b81b904 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -111,7 +111,7 @@ import org.apache.phoenix.schema.types.PDecimal; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; -import org.apache.tephra.TxConstants; +import org.apache.phoenix.transaction.PhoenixTransactionContext; import com.google.common.collect.Lists; import com.google.protobuf.ServiceException; @@ -274,7 +274,7 @@ public class IndexUtil { regionEndkey = tableRegionLocation.getRegionInfo().getEndKey(); } Delete delete = maintainer.buildDeleteMutation(kvBuilder, null, ptr, Collections.<KeyValue>emptyList(), ts, regionStartKey, regionEndkey); - delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, dataMutation.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY)); + delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, dataMutation.getAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY)); indexMutations.add(delete); } return indexMutations; http://git-wip-us.apache.org/repos/asf/phoenix/blob/2f2f6cc5/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index 0e3a9e9..30361c8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -86,7 +86,7 @@ import org.apache.phoenix.schema.RowKeyValueAccessor; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.types.PDataType; -import org.apache.tephra.util.TxUtils; +import org.apache.phoenix.transaction.TransactionFactory; import com.google.common.base.Joiner; import com.google.common.base.Splitter; @@ -1462,7 +1462,7 @@ public class PhoenixRuntime { * @return wall clock time in milliseconds (i.e. Epoch time) of a given Cell time stamp. */ public static long getWallClockTimeFromCellTimeStamp(long tsOfCell) { - return TxUtils.isPreExistingVersion(tsOfCell) ? tsOfCell : TransactionUtil.convertToMilliseconds(tsOfCell); + return TransactionFactory.getTransactionFactory().getTransactionContext().isPreExistingVersion(tsOfCell) ? tsOfCell : TransactionUtil.convertToMilliseconds(tsOfCell); } public static long getCurrentScn(ReadOnlyProps props) {
