Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.2 48bed8235 -> 8aff89599
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index d4f4ae3..f77ed75 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -1179,14 +1179,12 @@ public class MetaDataClient { // If the table is a view, then we will end up calling update stats // here for all the view indexes on it. We take care of local indexes later. if (index.getIndexType() != IndexType.LOCAL) { - if (index.getIndexType() != IndexType.LOCAL) { - if (table.getType() != PTableType.VIEW) { - rowCount += updateStatisticsInternal(index.getPhysicalName(), index, - updateStatisticsStmt.getProps(), true); - } else { - rowCount += updateStatisticsInternal(table.getPhysicalName(), index, - updateStatisticsStmt.getProps(), true); - } + if (table.getType() != PTableType.VIEW) { + rowCount += updateStatisticsInternal(index.getPhysicalName(), index, + updateStatisticsStmt.getProps(), true); + } else { + rowCount += updateStatisticsInternal(table.getPhysicalName(), index, + updateStatisticsStmt.getProps(), true); } } } @@ -1548,6 +1546,11 @@ public class MetaDataClient { throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_INDEX_ON_MUTABLE_TABLE_WITH_ROWTIMESTAMP).setTableName(indexTableName.getTableName()).build().buildException(); } } + if (dataTable.isTransactional() + && isLocalIndex + && dataTable.getTransactionProvider().getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.ALLOW_LOCAL_INDEX)) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_LOCAL_INDEX_FOR_TXN_TABLE).setMessage(dataTable.getTransactionProvider().name()).setTableName(indexTableName.getTableName()).build().buildException(); + } int posOffset = 0; List<PColumn> pkColumns = dataTable.getPKColumns(); Set<RowKeyColumnExpression> unusedPkColumns; @@ -2128,10 +2131,20 @@ public class MetaDataClient { .setSchemaName(schemaName).setTableName(tableName) .build().buildException(); } + if (TableProperty.TTL.getValue(commonFamilyProps) != null + && transactionProvider != null + && transactionProvider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.SET_TTL)) { + throw new SQLExceptionInfo.Builder(PhoenixTransactionProvider.Feature.SET_TTL.getCode()) + .setMessage(transactionProvider.name()) + .setSchemaName(schemaName) + .setTableName(tableName) + .build() + .buildException(); + } // Put potentially inferred value into tableProps as it's used by the createTable call below // to determine which coprocessors to install on the new table. - tableProps.put(PhoenixDatabaseMetaData.TRANSACTIONAL, transactionProvider != null); + tableProps.put(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER, transactionProvider); if (transactionProvider != null) { // TODO: for Omid // If TTL set, use Tephra TTL property name instead @@ -2355,32 +2368,61 @@ public class MetaDataClient { } else { Byte encodingSchemeSerializedByte = (Byte) TableProperty.COLUMN_ENCODED_BYTES.getValue(tableProps); if (encodingSchemeSerializedByte == null) { - encodingSchemeSerializedByte = (byte)connection.getQueryServices().getProps().getInt(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB, QueryServicesOptions.DEFAULT_COLUMN_ENCODED_BYTES); - } - encodingScheme = QualifierEncodingScheme.fromSerializedValue(encodingSchemeSerializedByte); + // Ignore default if transactional and column encoding is not supported (as with OMID) + if (transactionProvider == null || !transactionProvider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.COLUMN_ENCODING) ) { + encodingSchemeSerializedByte = (byte)connection.getQueryServices().getProps().getInt(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB, + QueryServicesOptions.DEFAULT_COLUMN_ENCODED_BYTES); + encodingScheme = QualifierEncodingScheme.fromSerializedValue(encodingSchemeSerializedByte); + } + } else { + encodingScheme = QualifierEncodingScheme.fromSerializedValue(encodingSchemeSerializedByte); + if (encodingScheme != NON_ENCODED_QUALIFIERS && transactionProvider != null && transactionProvider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.COLUMN_ENCODING) ) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.UNSUPPORTED_COLUMN_ENCODING_FOR_TXN_PROVIDER) + .setSchemaName(schemaName).setTableName(tableName) + .setMessage(transactionProvider.name()) + .build() + .buildException(); + } + } if (isImmutableRows) { - immutableStorageScheme = + ImmutableStorageScheme immutableStorageSchemeProp = (ImmutableStorageScheme) TableProperty.IMMUTABLE_STORAGE_SCHEME .getValue(tableProps); - if (immutableStorageScheme == null) { - if (multiTenant) { - immutableStorageScheme = - ImmutableStorageScheme - .valueOf(connection - .getQueryServices() - .getProps() - .get( - QueryServices.DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB, - QueryServicesOptions.DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME)); - } else { - immutableStorageScheme = - ImmutableStorageScheme - .valueOf(connection - .getQueryServices() - .getProps() - .get( - QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB, - QueryServicesOptions.DEFAULT_IMMUTABLE_STORAGE_SCHEME)); + if (immutableStorageSchemeProp == null) { + // Ignore default if transactional and column encoding is not supported + if (transactionProvider == null || + !transactionProvider.getTransactionProvider().isUnsupported( + PhoenixTransactionProvider.Feature.COLUMN_ENCODING) ) { + if (multiTenant) { + immutableStorageScheme = + ImmutableStorageScheme + .valueOf(connection + .getQueryServices() + .getProps() + .get( + QueryServices.DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB, + QueryServicesOptions.DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME)); + } else { + immutableStorageScheme = + ImmutableStorageScheme + .valueOf(connection + .getQueryServices() + .getProps() + .get( + QueryServices.DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB, + QueryServicesOptions.DEFAULT_IMMUTABLE_STORAGE_SCHEME)); + } + } + } else { + immutableStorageScheme = immutableStorageSchemeProp; + if (immutableStorageScheme != ONE_CELL_PER_COLUMN && transactionProvider != null && transactionProvider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.COLUMN_ENCODING) ) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.UNSUPPORTED_STORAGE_FORMAT_FOR_TXN_PROVIDER) + .setSchemaName(schemaName).setTableName(tableName) + .setMessage(transactionProvider.name()) + .build() + .buildException(); } } if (immutableStorageScheme != ONE_CELL_PER_COLUMN @@ -3135,6 +3177,7 @@ public class MetaDataClient { throws SQLException { return incrementTableSeqNum(table, expectedType, columnCountDelta, metaPropertiesEvaluated.getIsTransactional(), + metaPropertiesEvaluated.getTransactionProvider(), metaPropertiesEvaluated.getUpdateCacheFrequency(), metaPropertiesEvaluated.getIsImmutableRows(), metaPropertiesEvaluated.getDisableWAL(), @@ -3147,13 +3190,14 @@ public class MetaDataClient { } private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta, Boolean isTransactional, Long updateCacheFrequency) throws SQLException { - return incrementTableSeqNum(table, expectedType, columnCountDelta, isTransactional, updateCacheFrequency, null, null, null, null, -1L, null, null, null); + return incrementTableSeqNum(table, expectedType, columnCountDelta, isTransactional, null, updateCacheFrequency, null, null, null, null, -1L, null, null, null); } private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta, - Boolean isTransactional, Long updateCacheFrequency, Boolean isImmutableRows, Boolean disableWAL, - Boolean isMultiTenant, Boolean storeNulls, Long guidePostWidth, Boolean appendOnlySchema, ImmutableStorageScheme immutableStorageScheme - , Boolean useStatsForParallelization) + Boolean isTransactional, TransactionFactory.Provider transactionProvider, + Long updateCacheFrequency, Boolean isImmutableRows, Boolean disableWAL, + Boolean isMultiTenant, Boolean storeNulls, Long guidePostWidth, Boolean appendOnlySchema, + ImmutableStorageScheme immutableStorageScheme, Boolean useStatsForParallelization) throws SQLException { String schemaName = table.getSchemaName().getString(); String tableName = table.getTableName().getString(); @@ -3188,6 +3232,9 @@ public class MetaDataClient { if (isTransactional != null) { mutateBooleanProperty(tenantId, schemaName, tableName, TRANSACTIONAL, isTransactional); } + if (transactionProvider !=null) { + mutateByteProperty(tenantId, schemaName, tableName, TRANSACTION_PROVIDER, transactionProvider.getCode()); + } if (updateCacheFrequency != null) { mutateLongProperty(tenantId, schemaName, tableName, UPDATE_CACHE_FREQUENCY, updateCacheFrequency); } @@ -3244,6 +3291,27 @@ public class MetaDataClient { } } + private void mutateByteProperty(String tenantId, String schemaName, String tableName, + String propertyName, Byte propertyValue) throws SQLException { + String updatePropertySql = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " + + TENANT_ID + "," + + TABLE_SCHEM + "," + + TABLE_NAME + "," + + propertyName + + ") VALUES (?, ?, ?, ?)"; + try (PreparedStatement tableBoolUpsert = connection.prepareStatement(updatePropertySql)) { + tableBoolUpsert.setString(1, tenantId); + tableBoolUpsert.setString(2, schemaName); + tableBoolUpsert.setString(3, tableName); + if (propertyValue == null) { + tableBoolUpsert.setNull(4, Types.TINYINT); + } else { + tableBoolUpsert.setByte(4, propertyValue); + } + tableBoolUpsert.execute(); + } + } + private void mutateStringProperty(String tenantId, String schemaName, String tableName, String propertyName, String propertyValue) throws SQLException { String updatePropertySql = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " + @@ -3728,7 +3796,7 @@ public class MetaDataClient { */ private static byte[] getNewEmptyColumnFamilyOrNull (PTable table, PColumn columnToDrop) { if (table.getType() != PTableType.VIEW && !SchemaUtil.isPKColumn(columnToDrop) && table.getColumnFamilies().get(0).getName().equals(columnToDrop.getFamilyName()) && table.getColumnFamilies().get(0).getColumns().size() == 1) { - return SchemaUtil.getEmptyColumnFamily(table.getDefaultFamilyName(), table.getColumnFamilies().subList(1, table.getColumnFamilies().size())); + return SchemaUtil.getEmptyColumnFamily(table.getDefaultFamilyName(), table.getColumnFamilies().subList(1, table.getColumnFamilies().size()), table.getIndexType() == IndexType.LOCAL); } // If unchanged, return null return null; @@ -4533,8 +4601,8 @@ public class MetaDataClient { connection.getQueryServices().getProps().get( QueryServices.DEFAULT_TRANSACTION_PROVIDER_ATTRIB, QueryServicesOptions.DEFAULT_TRANSACTION_PROVIDER)); - metaPropertiesEvaluated.setTransactionProvider(provider); } + metaPropertiesEvaluated.setTransactionProvider(provider); if (provider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.ALTER_NONTX_TO_TX)) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL) .setMessage(provider.name() + ". ") http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java index d235d4b..9edc58b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java @@ -19,122 +19,111 @@ package org.apache.phoenix.transaction; import java.sql.SQLException; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.transaction.TransactionFactory.Provider; +import com.google.protobuf.InvalidProtocolBufferException; +//import org.apache.omid.tso.TSOMockModule; + public class OmidTransactionContext implements PhoenixTransactionContext { + public OmidTransactionContext() { + } + + public OmidTransactionContext(PhoenixConnection connection) throws SQLException { + } + + public OmidTransactionContext(byte[] txnBytes) throws InvalidProtocolBufferException { + } + + public OmidTransactionContext(PhoenixTransactionContext ctx, boolean subTask) { + } + @Override public void begin() throws SQLException { - // TODO Auto-generated method stub - } @Override public void commit() throws SQLException { - // TODO Auto-generated method stub - } @Override public void abort() throws SQLException { - // TODO Auto-generated method stub - } @Override public void checkpoint(boolean hasUncommittedData) throws SQLException { - // TODO Auto-generated method stub - } @Override public void commitDDLFence(PTable dataTable) throws SQLException { - // TODO Auto-generated method stub - } @Override public void join(PhoenixTransactionContext ctx) { - // TODO Auto-generated method stub - } @Override public boolean isTransactionRunning() { - // TODO Auto-generated method stub return false; } @Override public void reset() { - // TODO Auto-generated method stub - } @Override public long getTransactionId() { - // TODO Auto-generated method stub return 0; } @Override public long getReadPointer() { - // TODO Auto-generated method stub return 0; } @Override public long getWritePointer() { - // TODO Auto-generated method stub return 0; } @Override public PhoenixVisibilityLevel getVisibilityLevel() { - // TODO Auto-generated method stub return null; } @Override public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel) { - // TODO Auto-generated method stub - } @Override public byte[] encodeTransaction() throws SQLException { - // TODO Auto-generated method stub return null; } @Override public Provider getProvider() { - return Provider.OMID; + return TransactionFactory.Provider.OMID; } @Override - public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext contex, boolean subTask) { + public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext context, boolean subTask) { return null; } @Override public void markDMLFence(PTable dataTable) { - // TODO Auto-generated method stub - } @Override - public HTableInterface getTransactionalTable(HTableInterface htable, boolean isImmutable) { - // TODO Auto-generated method stub - return null; + public Table getTransactionalTable(Table htable, boolean isConflictFree) throws SQLException { + return new OmidTransactionTable(this, htable, isConflictFree); } @Override - public HTableInterface getTransactionalTableWriter(PhoenixConnection connection, PTable table, HTableInterface htable, boolean isIndex) { - // TODO Auto-generated method stub - return null; + public Table getTransactionalTableWriter(PhoenixConnection connection, PTable table, Table htable, boolean isIndex) throws SQLException { + return new OmidTransactionTable(this, htable, table.isImmutableRows() || isIndex); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java index c211661..c53215c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java @@ -18,15 +18,25 @@ package org.apache.phoenix.transaction; import java.io.IOException; +import java.sql.SQLException; +import java.util.Arrays; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.phoenix.coprocessor.OmidGCProcessor; +import org.apache.phoenix.coprocessor.OmidTransactionalProcessor; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; import org.apache.phoenix.transaction.TransactionFactory.Provider; public class OmidTransactionProvider implements PhoenixTransactionProvider { private static final OmidTransactionProvider INSTANCE = new OmidTransactionProvider(); + public static final String OMID_TSO_PORT = "phoenix.omid.tso.port"; + public static final String OMID_TSO_CONFLICT_MAP_SIZE = "phoenix.omid.tso.conflict.map.size"; + public static final String OMID_TSO_TIMESTAMP_TYPE = "phoenix.omid.tso.timestamp.type"; + public static final int DEFAULT_OMID_TSO_CONFLICT_MAP_SIZE = 1000; + public static final String DEFAULT_OMID_TSO_TIMESTAMP_TYPE = "WORLD_TIME"; public static final OmidTransactionProvider getInstance() { return INSTANCE; @@ -36,33 +46,54 @@ public class OmidTransactionProvider implements PhoenixTransactionProvider { } @Override + public String toString() { + return getProvider().toString(); + } + + @Override public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException { - //return new OmidTransactionContext(txnBytes); - return null; + // Remove last byte (which is used to identify transaction provider) + return new OmidTransactionContext(Arrays.copyOf(txnBytes,txnBytes.length-1)); } @Override - public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) { - //return new OmidTransactionContext(connection); - return null; + public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) throws SQLException { + return new OmidTransactionContext(connection); } @Override - public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo) { - // TODO Auto-generated method stub - return null; + public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo) throws SQLException{ + return new OmidTransactionClient(); + } + + static class OmidTransactionClient implements PhoenixTransactionClient { + @Override + public void close() throws IOException {} } @Override - public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connectionInfo) { - // TODO Auto-generated method stub - return null; + public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connectionInfo) throws SQLException{ + return new OmidTransactionService(); + } + + static class OmidTransactionService implements PhoenixTransactionService { + + public void start() { + } + + @Override + public void close() throws IOException { + } } @Override public Class<? extends RegionObserver> getCoprocessor() { - // TODO Auto-generated method stub - return null; + return OmidTransactionalProcessor.class; + } + + @Override + public Class<? extends RegionObserver> getGCCoprocessor() { + return OmidGCProcessor.class; } @Override @@ -72,13 +103,11 @@ public class OmidTransactionProvider implements PhoenixTransactionProvider { @Override public boolean isUnsupported(Feature feature) { - // FIXME: if we initialize a Set with the unsupported features - // and check for containment, we run into a test failure - // in SetPropertyOnEncodedTableIT.testSpecifyingColumnFamilyForTTLFails() - // due to TableProperty.colFamSpecifiedException being null - // (though it's set in the constructor). I suspect some - // mysterious class loader issue. The below works fine - // as a workaround. - return (feature == Feature.ALTER_NONTX_TO_TX); + return true; + } + + @Override + public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) { + return put; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java new file mode 100644 index 0000000..cba1d56 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.transaction; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; + +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; + +public class OmidTransactionTable implements Table { + + public OmidTransactionTable() throws SQLException { + } + + public OmidTransactionTable(PhoenixTransactionContext ctx, Table hTable) throws SQLException { + } + + public OmidTransactionTable(PhoenixTransactionContext ctx, Table hTable, boolean isConflictFree) throws SQLException { + } + + @Override + public Result get(Get get) throws IOException { + return null; + } + + @Override + public void put(Put put) throws IOException { + } + + @Override + public void delete(Delete delete) throws IOException { + } + + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + return null; + } + + @Override + public Configuration getConfiguration() { + return null; + } + + @Override + public HTableDescriptor getTableDescriptor() throws IOException { + return null; + } + + @Override + public boolean exists(Get get) throws IOException { + return false; + } + + @Override + public Result[] get(List<Get> gets) throws IOException { + return null; + } + + @Override + public ResultScanner getScanner(byte[] family) throws IOException { + return null; + } + + @Override + public ResultScanner getScanner(byte[] family, byte[] qualifier) + throws IOException { + return null; + } + + @Override + public void put(List<Put> puts) throws IOException { + } + + @Override + public void delete(List<Delete> deletes) throws IOException { + } + + @Override + public void close() throws IOException { + } + + @Override + public TableName getName() { + return null; + } + + @Override + public boolean[] existsAll(List<Get> gets) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void batch(List<? extends Row> actions, Object[] results) + throws IOException, InterruptedException { + } + + @Override + public Object[] batch(List<? extends Row> actions) throws IOException, + InterruptedException { + return null; + } + + @Override + public <R> void batchCallback(List<? extends Row> actions, + Object[] results, Callback<R> callback) throws IOException, + InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public <R> Object[] batchCallback(List<? extends Row> actions, + Callback<R> callback) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Put put) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Put put) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Delete delete) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Delete delete) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void mutateRow(RowMutations rm) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public Result append(Append append) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public Result increment(Increment increment) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount, Durability durability) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public CoprocessorRpcChannel coprocessorService(byte[] row) { + throw new UnsupportedOperationException(); + } + + @Override + public <T extends Service, R> Map<byte[], R> coprocessorService( + Class<T> service, byte[] startKey, byte[] endKey, + Call<T, R> callable) throws ServiceException, Throwable { + throw new UnsupportedOperationException(); + } + + @Override + public <T extends Service, R> void coprocessorService(Class<T> service, + byte[] startKey, byte[] endKey, Call<T, R> callable, + Callback<R> callback) throws ServiceException, Throwable { + throw new UnsupportedOperationException(); + } + + @Override + public <R extends Message> Map<byte[], R> batchCoprocessorService( + MethodDescriptor methodDescriptor, Message request, + byte[] startKey, byte[] endKey, R responsePrototype) + throws ServiceException, Throwable { + throw new UnsupportedOperationException(); + } + + @Override + public <R extends Message> void batchCoprocessorService( + MethodDescriptor methodDescriptor, Message request, + byte[] startKey, byte[] endKey, R responsePrototype, + Callback<R> callback) throws ServiceException, Throwable { + throw new UnsupportedOperationException(); + } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, RowMutations mutation) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long getWriteBufferSize() { + throw new UnsupportedOperationException(); + } + + @Override + public void setWriteBufferSize(long writeBufferSize) throws IOException { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java index dfa35be..1882004 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java @@ -20,7 +20,7 @@ package org.apache.phoenix.transaction; import java.sql.SQLException; import java.util.concurrent.TimeoutException; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.transaction.TransactionFactory.Provider; @@ -106,12 +106,12 @@ public interface PhoenixTransactionContext { } @Override - public HTableInterface getTransactionalTable(HTableInterface htable, boolean isImmutable) { + public Table getTransactionalTable(Table htable, boolean isConflictFree) { return null; } @Override - public HTableInterface getTransactionalTableWriter(PhoenixConnection connection, PTable table, HTableInterface htable, boolean isIndex) { + public Table getTransactionalTableWriter(PhoenixConnection connection, PTable table, Table htable, boolean isIndex) { return null; } }; @@ -230,6 +230,6 @@ public interface PhoenixTransactionContext { public Provider getProvider(); public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext contex, boolean subTask); - public HTableInterface getTransactionalTable(HTableInterface htable, boolean isImmutable); - public HTableInterface getTransactionalTableWriter(PhoenixConnection connection, PTable table, HTableInterface htable, boolean isIndex) throws SQLException; + public Table getTransactionalTable(Table htable, boolean isConflictFree) throws SQLException; + public Table getTransactionalTableWriter(PhoenixConnection connection, PTable table, Table htable, boolean isIndex) throws SQLException; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java index cdc6058..b7f660e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java @@ -18,8 +18,10 @@ package org.apache.phoenix.transaction; import java.io.IOException; +import java.sql.SQLException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -27,7 +29,12 @@ import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; public interface PhoenixTransactionProvider { public enum Feature { - ALTER_NONTX_TO_TX(SQLExceptionCode.CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL); + ALTER_NONTX_TO_TX(SQLExceptionCode.CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL), + COLUMN_ENCODING(SQLExceptionCode.UNSUPPORTED_COLUMN_ENCODING_FOR_TXN_PROVIDER), + MAINTAIN_LOCAL_INDEX_ON_SERVER(null), + SET_TTL(SQLExceptionCode.TTL_UNSUPPORTED_FOR_TXN_TABLE), + ALLOW_LOCAL_INDEX(SQLExceptionCode.CANNOT_CREATE_LOCAL_INDEX_FOR_TXN_TABLE) + ; private final SQLExceptionCode code; @@ -40,12 +47,23 @@ public interface PhoenixTransactionProvider { } } public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException; - public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection); - - public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo); - public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connectionInfo); + public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) throws SQLException; + + public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo) throws SQLException; + public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connectionInfo) throws SQLException; public Class<? extends RegionObserver> getCoprocessor(); - + public Class<? extends RegionObserver> getGCCoprocessor(); + public TransactionFactory.Provider getProvider(); public boolean isUnsupported(Feature feature); + + /** + * Converts put operation to autocommit operation + * @param put put operation + * @param timestamp - start timestamp + * @param commitTimestamp - commit timestamp + * @return put operation with metadata + * @throws IOException + */ + public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java index 8942363..18a05ce 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java @@ -28,10 +28,9 @@ import java.util.concurrent.TimeoutException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -84,7 +83,7 @@ public class TephraTransactionContext implements PhoenixTransactionContext { this.tx = CODEC.decode(txnBytes); } - public TephraTransactionContext(PhoenixConnection connection) { + public TephraTransactionContext(PhoenixConnection connection) throws SQLException { PhoenixTransactionClient client = connection.getQueryServices().initTransactionClient(getProvider()); assert (client instanceof TephraTransactionClient); this.txServiceClient = ((TephraTransactionClient)client).getTransactionClient(); @@ -224,6 +223,12 @@ public class TephraTransactionContext implements PhoenixTransactionContext { .setSchemaName(dataTable.getSchemaName().getString()) .setTableName(dataTable.getTableName().getString()).build() .buildException(); + } finally { + // The client expects a transaction to be in progress on the txContext while the + // VisibilityFence.prepareWait() starts a new tx and finishes/aborts it. After it's + // finished, we start a new one here. + // TODO: seems like an autonomous tx capability in Tephra would be useful here. + this.begin(); } } @@ -404,78 +409,27 @@ public class TephraTransactionContext implements PhoenixTransactionContext { return new TephraTransactionContext(context, subTask); } - public static class TransactionAwareHTableDelegate extends TransactionAwareHTable implements HTableInterface { - private final HTableInterface delegate; - - public TransactionAwareHTableDelegate(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel) { - super(hTable, conflictLevel); - delegate = hTable; - } - - @Override - public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) - throws IOException { - return delegate.incrementColumnValue(row, family, qualifier, amount, writeToWAL); - } - - @Override - public Boolean[] exists(List<Get> gets) throws IOException { - return delegate.exists(gets); - } - - @Override - public void setAutoFlush(boolean autoFlush) { - delegate.setAutoFlush(autoFlush); - } - - @Override - public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { - delegate.setAutoFlush(autoFlush, clearBufferOnFail); - } - - @Override - public void setAutoFlushTo(boolean autoFlush) { - delegate.setAutoFlushTo(autoFlush); - } - - @Override - public boolean isAutoFlush() { - return delegate.isAutoFlush(); - } - - @Override - public void flushCommits() throws IOException { - delegate.flushCommits(); - } - - @Override - public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { - return delegate.getRowOrBefore(row, family); - } - - } - @Override - public HTableInterface getTransactionalTable(HTableInterface htable, boolean isImmutable) { - TransactionAwareHTableDelegate transactionAwareHTable = new TransactionAwareHTableDelegate(htable, isImmutable ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW); + public Table getTransactionalTable(Table htable, boolean isConflictFree) { + TransactionAwareHTable transactionAwareHTable = new TransactionAwareHTable(htable, isConflictFree ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW); this.addTransactionAware(transactionAwareHTable); return transactionAwareHTable; } @Override - public HTableInterface getTransactionalTableWriter(PhoenixConnection connection, PTable table, HTableInterface htable, boolean isIndex) throws SQLException { + public Table getTransactionalTableWriter(PhoenixConnection connection, PTable table, Table htable, boolean isIndex) throws SQLException { // If we have indexes, wrap the HTable in a delegate HTable that // will attach the necessary index meta data in the event of a // rollback - TransactionAwareHTableDelegate transactionAwareHTable; + TransactionAwareHTable transactionAwareHTable; // Don't add immutable indexes (those are the only ones that would participate // during a commit), as we don't need conflict detection for these. if (isIndex) { - transactionAwareHTable = new TransactionAwareHTableDelegate(htable, TxConstants.ConflictDetection.NONE); + transactionAwareHTable = new TransactionAwareHTable(htable, TxConstants.ConflictDetection.NONE); transactionAwareHTable.startTx(getTransaction()); } else { htable = new RollbackHookHTableWrapper(htable, table, connection); - transactionAwareHTable = new TransactionAwareHTableDelegate(htable, table.isImmutableRows() ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW); + transactionAwareHTable = new TransactionAwareHTable(htable, table.isImmutableRows() ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW); // Even for immutable, we need to do this so that an abort has the state // necessary to generate the rows to delete. this.addTransactionAware(transactionAwareHTable); @@ -497,7 +451,7 @@ public class TephraTransactionContext implements PhoenixTransactionContext { private final PTable table; private final PhoenixConnection connection; - private RollbackHookHTableWrapper(HTableInterface delegate, PTable table, PhoenixConnection connection) { + private RollbackHookHTableWrapper(Table delegate, PTable table, PhoenixConnection connection) { super(delegate); this.table = table; this.connection = connection; http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java index 2e52efa..70937cf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java @@ -18,10 +18,12 @@ package org.apache.phoenix.transaction; import java.io.IOException; +import java.sql.SQLException; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.phoenix.coprocessor.TephraTransactionalProcessor; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -59,12 +61,17 @@ public class TephraTransactionProvider implements PhoenixTransactionProvider { } @Override + public String toString() { + return getProvider().toString(); + } + + @Override public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException { return new TephraTransactionContext(txnBytes); } @Override - public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) { + public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) throws SQLException { return new TephraTransactionContext(connection); } @@ -185,6 +192,9 @@ public class TephraTransactionProvider implements PhoenixTransactionProvider { } @Override + public Class<? extends RegionObserver> getGCCoprocessor() {return null;} + + @Override public Provider getProvider() { return TransactionFactory.Provider.TEPHRA; } @@ -194,4 +204,8 @@ public class TephraTransactionProvider implements PhoenixTransactionProvider { return false; } + @Override + public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) { + return put; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java index 62bd808..3e9182f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java @@ -24,15 +24,17 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol; public class TransactionFactory { public enum Provider { - TEPHRA((byte)1, TephraTransactionProvider.getInstance()), - OMID((byte)2, OmidTransactionProvider.getInstance()); + TEPHRA((byte)1, TephraTransactionProvider.getInstance(), true), + OMID((byte)2, OmidTransactionProvider.getInstance(), false); private final byte code; private final PhoenixTransactionProvider provider; + private final boolean runTests; - Provider(byte code, PhoenixTransactionProvider provider) { + Provider(byte code, PhoenixTransactionProvider provider, boolean runTests) { this.code = code; this.provider = provider; + this.runTests = runTests; } public byte getCode() { @@ -53,19 +55,31 @@ public class TransactionFactory { public PhoenixTransactionProvider getTransactionProvider() { return provider; } + + public boolean runTests() { + return runTests; + } } public static PhoenixTransactionProvider getTransactionProvider(Provider provider) { return provider.getTransactionProvider(); } - public static PhoenixTransactionContext getTransactionContext(byte[] txState, int clientVersion) throws IOException { + public static PhoenixTransactionProvider getTransactionProvider(byte[] txState, int clientVersion) { if (txState == null || txState.length == 0) { return null; } Provider provider = (clientVersion < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0) - ? Provider.OMID + ? Provider.TEPHRA : Provider.fromCode(txState[txState.length-1]); - return provider.getTransactionProvider().getTransactionContext(txState); + return provider.getTransactionProvider(); + } + + public static PhoenixTransactionContext getTransactionContext(byte[] txState, int clientVersion) throws IOException { + PhoenixTransactionProvider provider = getTransactionProvider(txState, clientVersion); + if (provider == null) { + return null; + } + return provider.getTransactionContext(txState); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index b63b053..9010f5f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -119,6 +119,7 @@ import org.apache.phoenix.schema.types.PDecimal; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature; import com.google.common.collect.Lists; @@ -834,8 +835,11 @@ public class IndexUtil { public static List<PTable> getClientMaintainedIndexes(PTable table) { Iterator<PTable> indexIterator = // Only maintain tables with immutable rows through this client-side mechanism - (table.isImmutableRows() || table.isTransactional()) ? IndexMaintainer.maintainedGlobalIndexes(table - .getIndexes().iterator()) : Collections.<PTable> emptyIterator(); + (table.isTransactional() && table.getTransactionProvider().getTransactionProvider().isUnsupported(Feature.MAINTAIN_LOCAL_INDEX_ON_SERVER)) ? + IndexMaintainer.maintainedIndexes(table.getIndexes().iterator()) : + (table.isImmutableRows() || table.isTransactional()) ? + IndexMaintainer.maintainedGlobalIndexes(table.getIndexes().iterator()) : + Collections.<PTable>emptyIterator(); return Lists.newArrayList(indexIterator); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java index 48d2a5c..ec2eb14 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java @@ -500,23 +500,23 @@ public class SchemaUtil { return isString ? ("'" + type.toObject(value).toString() + "'") : type.toObject(value, offset, length).toString(); } - public static byte[] getEmptyColumnFamily(PName defaultColumnFamily, List<PColumnFamily> families) { - return families.isEmpty() ? defaultColumnFamily == null ? QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES : defaultColumnFamily.getBytes() : families.get(0).getName().getBytes(); + public static byte[] getEmptyColumnFamily(PName defaultColumnFamily, List<PColumnFamily> families, boolean isLocalIndex) { + return families.isEmpty() ? defaultColumnFamily == null ? (isLocalIndex ? QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES : QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES) : defaultColumnFamily.getBytes() : families.get(0).getName().getBytes(); } public static byte[] getEmptyColumnFamily(PTable table) { List<PColumnFamily> families = table.getColumnFamilies(); - return families.isEmpty() ? table.getDefaultFamilyName() == null ? QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES : table.getDefaultFamilyName().getBytes() : families.get(0).getName().getBytes(); + return families.isEmpty() ? table.getDefaultFamilyName() == null ? (table.getIndexType() == IndexType.LOCAL ? QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES : QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES) : table.getDefaultFamilyName().getBytes() : families.get(0).getName().getBytes(); } public static String getEmptyColumnFamilyAsString(PTable table) { List<PColumnFamily> families = table.getColumnFamilies(); - return families.isEmpty() ? table.getDefaultFamilyName() == null ? QueryConstants.DEFAULT_COLUMN_FAMILY : table.getDefaultFamilyName().getString() : families.get(0).getName().getString(); + return families.isEmpty() ? table.getDefaultFamilyName() == null ? (table.getIndexType() == IndexType.LOCAL ? QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY : QueryConstants.DEFAULT_COLUMN_FAMILY) : table.getDefaultFamilyName().getString() : families.get(0).getName().getString(); } public static ImmutableBytesPtr getEmptyColumnFamilyPtr(PTable table) { List<PColumnFamily> families = table.getColumnFamilies(); - return families.isEmpty() ? table.getDefaultFamilyName() == null ? QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES_PTR : table.getDefaultFamilyName().getBytesPtr() : families.get(0) + return families.isEmpty() ? table.getDefaultFamilyName() == null ? (table.getIndexType() == IndexType.LOCAL ? QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES_PTR : QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES_PTR) : table.getDefaultFamilyName().getBytesPtr() : families.get(0) .getName().getBytesPtr(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java index 83d05f3..1825f04 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredColumnIndexCodec.java @@ -52,12 +52,12 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec { } @Override - public void initialize(Configuration conf, byte[] regionStartKey, byte[] regionEndKey, byte[] tableName) { + public void initialize(Configuration conf, byte[] tableName) { groups = CoveredColumnIndexSpecifierBuilder.getColumns(conf); } @Override - public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData indexMetaData) { + public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData indexMetaData, byte[] regionStartKey, byte[] regionEndKey) { List<IndexUpdate> updates = new ArrayList<IndexUpdate>(groups.size()); for (ColumnGroup group : groups) { IndexUpdate update = getIndexUpdateForGroup(group, state, indexMetaData); @@ -111,7 +111,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec { } @Override - public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) { + public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) { List<IndexUpdate> deletes = new ArrayList<IndexUpdate>(groups.size()); for (ColumnGroup group : groups) { deletes.add(getDeleteForGroup(group, state, context)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java index 204b1a0..665d2bf 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java @@ -48,17 +48,17 @@ public class CoveredIndexCodecForTesting extends BaseIndexCodec { } @Override - public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) { + public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) { return this.deletes; } @Override - public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) { + public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) { return this.updates; } @Override - public void initialize(Configuration conf, byte[] regionStartKey, byte[] regionEndKey, byte[] tableName) { + public void initialize(Configuration conf, byte[] tableName) { // noop } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java index d63dd6b..e0a8ebe 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestCoveredColumnIndexCodec.java @@ -177,14 +177,14 @@ public class TestCoveredColumnIndexCodec { // check the codec for deletes it should send LocalTableState state = new LocalTableState(table, p); - Iterable<IndexUpdate> updates = codec.getIndexDeletes(state, IndexMetaData.NULL_INDEX_META_DATA); + Iterable<IndexUpdate> updates = codec.getIndexDeletes(state, IndexMetaData.NULL_INDEX_META_DATA, null, null); assertFalse("Found index updates without any existing kvs in table!", updates.iterator().next() .isValid()); // get the updates with the pending update state.setCurrentTimestamp(1); state.addPendingUpdates(kvs); - updates = codec.getIndexUpserts(state, IndexMetaData.NULL_INDEX_META_DATA); + updates = codec.getIndexUpserts(state, IndexMetaData.NULL_INDEX_META_DATA, null, null); assertTrue("Didn't find index updates for pending primary table update!", updates.iterator() .hasNext()); for (IndexUpdate update : updates) { @@ -207,7 +207,7 @@ public class TestCoveredColumnIndexCodec { state = new LocalTableState(table, d); state.setCurrentTimestamp(2); // check the cleanup of the current table, after the puts (mocking a 'next' update) - updates = codec.getIndexDeletes(state, IndexMetaData.NULL_INDEX_META_DATA); + updates = codec.getIndexDeletes(state, IndexMetaData.NULL_INDEX_META_DATA, null, null); for (IndexUpdate update : updates) { assertTrue("Didn't have any index cleanup, even though there is current state", update.isValid()); @@ -237,7 +237,7 @@ public class TestCoveredColumnIndexCodec { state.setCurrentTimestamp(d.getTimeStamp()); // now we shouldn't see anything when getting the index update state.addPendingUpdates(d.getFamilyMap().get(FAMILY)); - Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, IndexMetaData.NULL_INDEX_META_DATA); + Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, IndexMetaData.NULL_INDEX_META_DATA, null, null); for (IndexUpdate update : updates) { assertFalse("Had some index updates, though it should have been covered by the delete", update.isValid()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java index e279074..841abb6 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java @@ -22,6 +22,7 @@ import static org.apache.phoenix.query.QueryServicesOptions.withDefaults; import org.apache.curator.shaded.com.google.common.io.Files; import org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec; +import org.apache.phoenix.transaction.OmidTransactionProvider; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.tephra.TxConstants; @@ -124,7 +125,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { .setDefaultIndexPopulationWaitTime(DEFAULT_INDEX_POPULATION_WAIT_TIME) .setIndexRebuildTaskInitialDelay(DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY) .set(AGGREGATE_CHUNK_SIZE_INCREASE_ATTRIB, DEFAULT_AGGREGATE_CHUNK_SIZE_INCREASE) - // setup default configs for Tephra + // setup default test configs for Tephra .set(TxConstants.Manager.CFG_DO_PERSIST, false) .set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times") .set(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1) @@ -132,6 +133,8 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { .set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, Files.createTempDir().getAbsolutePath()) .set(TxConstants.Manager.CFG_TX_TIMEOUT, DEFAULT_TXN_TIMEOUT_SECONDS) .set(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L) + // setup default test configs for Omid + .set(OmidTransactionProvider.OMID_TSO_PORT, Networks.getRandomPort()) ; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index 0def76f..f0a26b9 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -122,6 +123,7 @@ import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.GuidePostsKey; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.transaction.TransactionFactory; import com.google.common.base.Objects; import com.google.common.collect.Lists; @@ -675,15 +677,6 @@ public class TestUtil { assertEquals(rs.getDate(6), date); } - public static String getTableName(Boolean mutable, Boolean transactional) { - StringBuilder tableNameBuilder = new StringBuilder(DEFAULT_DATA_TABLE_NAME); - if (mutable!=null) - tableNameBuilder.append(mutable ? "_MUTABLE" : "_IMMUTABLE"); - if (transactional!=null) - tableNameBuilder.append(transactional ? "_TXN" : "_NON_TXN"); - return tableNameBuilder.toString(); - } - public static ClientAggregators getSingleSumAggregator(String url, Properties props) throws SQLException { try (PhoenixConnection pconn = DriverManager.getConnection(url, props).unwrap(PhoenixConnection.class)) { PhoenixStatement statement = new PhoenixStatement(pconn); @@ -803,7 +796,7 @@ public class TestUtil { if (table.isTransactional()) { mutationState.startTransaction(table.getTransactionProvider()); } - try (HTableInterface htable = mutationState.getHTable(table)) { + try (Table htable = mutationState.getHTable(table)) { byte[] markerRowKey = Bytes.toBytes("TO_DELETE"); Put put = new Put(markerRowKey); @@ -1088,4 +1081,28 @@ public class TestUtil { } assertTrue(!rs.next()); } + + public static Collection<Object[]> filterTxParamData(Collection<Object[]> data, int index) { + boolean runAllTests = true; + boolean runNoTests = true; + + for (TransactionFactory.Provider provider : TransactionFactory.Provider.values()) { + runAllTests &= provider.runTests(); + runNoTests &= !provider.runTests(); + } + if (runNoTests) { + return Collections.emptySet(); + } + if (runAllTests) { + return data; + } + List<Object[]> filteredData = Lists.newArrayListWithExpectedSize(data.size()); + for (Object[] params : data) { + String provider = (String)params[index]; + if (provider == null || TransactionFactory.Provider.valueOf(provider).runTests()) { + filteredData.add(params); + } + } + return filteredData; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8aff8959/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d7dd182..0aaedbd 100644 --- a/pom.xml +++ b/pom.xml @@ -100,7 +100,7 @@ <joni.version>2.1.2</joni.version> <avatica.version>1.12.0</avatica.version> <jettyVersion>8.1.7.v20120910</jettyVersion> - <tephra.version>0.14.0-incubating</tephra.version> + <tephra.version>0.15.0-incubating</tephra.version> <spark.version>2.0.2</spark.version> <scala.version>2.11.8</scala.version> <scala.binary.version>2.11</scala.binary.version>
