http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 7357ef3..450a99f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -109,7 +109,6 @@ import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.IndexUtil; -import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; @@ -541,7 +540,11 @@ public class UpsertCompiler { // Disable running upsert select on server side if a table has global mutable secondary indexes on it boolean hasGlobalMutableIndexes = SchemaUtil.hasGlobalIndex(table) && !table.isImmutableRows(); boolean hasWhereSubquery = select.getWhere() != null && select.getWhere().hasSubquery(); - runOnServer = (sameTable || (serverUpsertSelectEnabled && !hasGlobalMutableIndexes)) && isAutoCommit && !table.isTransactional() + runOnServer = (sameTable || (serverUpsertSelectEnabled && !hasGlobalMutableIndexes)) && isAutoCommit + // We can run the upsert select for initial index population on server side for transactional + // tables since the writes do not need to be done transactionally, since we gate the index + // usage on successfully writing all data rows. + && (!table.isTransactional() || table.getType() == PTableType.INDEX) && !(table.isImmutableRows() && !table.getIndexes().isEmpty()) && !select.isJoin() && !hasWhereSubquery && table.getRowTimestampColPos() == -1; } @@ -1039,12 +1042,15 @@ public class UpsertCompiler { byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY; + ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION); + if (aggPlan.getTableRef().getTable().isTransactional() + || (table.getType() == PTableType.INDEX && table.isTransactional())) { + scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); + } if (ptr.getLength() > 0) { byte[] uuidValue = ServerCacheClient.generateId(); scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get()); - scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); - ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION); } ResultIterator iterator = aggPlan.iterator(); try {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java new file mode 100644 index 0000000..70658fb --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidGCProcessor.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; + + +public class OmidGCProcessor extends DelegateRegionObserver { + + public OmidGCProcessor() { + super(new BaseRegionObserver()); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java new file mode 100644 index 0000000..fc246d4 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/OmidTransactionalProcessor.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; + + +public class OmidTransactionalProcessor extends DelegateRegionObserver { + + public OmidTransactionalProcessor() { + super(new BaseRegionObserver()); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java index aa406fd..0448972 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.ObserverContextImpl; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; @@ -59,11 +60,7 @@ import org.apache.hadoop.hbase.security.access.AuthResult; import org.apache.hadoop.hbase.security.access.Permission; import org.apache.hadoop.hbase.security.access.Permission.Action; import org.apache.hadoop.hbase.security.access.UserPermission; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.Bytes; -import com.google.protobuf.ByteString; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; import org.apache.phoenix.coprocessor.PhoenixMetaDataCoprocessorHost.PhoenixMetaDataControllerEnvironment; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; @@ -72,6 +69,10 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.util.MetaDataUtil; +import com.google.protobuf.ByteString; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; + public class PhoenixAccessController extends BaseMetaDataEndpointObserver { @@ -464,7 +465,7 @@ public class PhoenixAccessController extends BaseMetaDataEndpointObserver { private void callGetUserPermissionsRequest(final List<UserPermission> userPermissions, AccessControlService.Interface service , AccessControlProtos.GetUserPermissionsRequest request, RpcController controller) { - ((AccessControlService.Interface)service).getUserPermissions(controller, request, + service.getUserPermissions(controller, request, new RpcCallback<AccessControlProtos.GetUserPermissionsResponse>() { @Override public void run(AccessControlProtos.GetUserPermissionsResponse message) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index e4fc149..d94ce7f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -136,6 +136,8 @@ import org.apache.phoenix.schema.types.PDouble; import org.apache.phoenix.schema.types.PFloat; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.PhoenixTransactionProvider; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.EnvironmentEdgeManager; @@ -442,6 +444,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver byte[] replayMutations = scan.getAttribute(BaseScannerRegionObserver.REPLAY_WRITES); byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID); byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE); + byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION); + PhoenixTransactionProvider txnProvider = null; + if (txState != null) { + int clientVersion = clientVersionBytes == null ? ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes); + txnProvider = TransactionFactory.getTransactionProvider(txState, clientVersion); + } List<Expression> selectExpressions = null; byte[] upsertSelectTable = scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE); boolean isUpsert = false; @@ -550,7 +558,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver useIndexProto = false; } - byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION); if(needToWrite) { synchronized (lock) { if (isRegionClosingOrSplitting) { @@ -673,6 +680,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver valueGetter, ptr, results.get(0).getTimestamp(), env.getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey()); + + if (txnProvider != null) { + put = txnProvider.markPutAsCommitted(put, ts, ts); + } indexMutations.add(put); } } @@ -740,6 +751,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver for (Mutation mutation : row.toRowMutations()) { if (replayMutations != null) { mutation.setAttribute(REPLAY_WRITES, replayMutations); + } else if (txnProvider != null && projectedTable.getType() == PTableType.INDEX) { + mutation = txnProvider.markPutAsCommitted((Put)mutation, ts, ts); } mutations.add(mutation); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index d6a70f2..c0a81ec 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -297,7 +297,12 @@ public enum SQLExceptionCode { UNKNOWN_TRANSACTION_PROVIDER(1089,"44A20", "Unknown TRANSACTION_PROVIDER: "), CANNOT_START_TXN_IF_TXN_DISABLED(1091, "44A22", "Cannot start transaction if transactions are disabled."), CANNOT_MIX_TXN_PROVIDERS(1092, "44A23", "Cannot mix transaction providers: "), - CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL(1093, "44A24", "Cannot alter table from non transactional to transactional for "), + CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL(1093, "44A24", "Cannot alter table from non transactional to transactional for"), + UNSUPPORTED_COLUMN_ENCODING_FOR_TXN_PROVIDER(1094, "44A25", "Column encoding is not supported for"), + UNSUPPORTED_STORAGE_FORMAT_FOR_TXN_PROVIDER(1095, "44A26", "Only ONE_CELL_PER_COLUMN storage scheme is supported for"), + CANNOT_SWITCH_TXN_PROVIDERS(1096, "44A27", "Cannot switch transaction providers."), + TTL_UNSUPPORTED_FOR_TXN_TABLE(10947, "44A28", "TTL is not supported for"), + CANNOT_CREATE_LOCAL_INDEX_FOR_TXN_TABLE(10948, "44A29", "Local indexes cannot be created for"), /** Sequence related */ SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index cf17b90..4a75e52 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -48,8 +48,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.htrace.Span; import org.apache.htrace.TraceScope; -import org.apache.phoenix.cache.IndexMetaDataCache; -import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.MutationPlan; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; @@ -299,7 +297,8 @@ public class MutationState implements SQLCloseable { public Table getHTable(PTable table) throws SQLException { Table htable = this.getConnection().getQueryServices().getTable(table.getPhysicalName().getBytes()); if (table.isTransactional() && phoenixTransactionContext.isTransactionRunning()) { - htable = phoenixTransactionContext.getTransactionalTable(htable, table.isImmutableRows()); + // We're only using this table for reading, so we want it wrapped even if it's an index + htable = phoenixTransactionContext.getTransactionalTable(htable, table.isImmutableRows() || table.getType() == PTableType.INDEX); } return htable; } @@ -564,8 +563,7 @@ public class MutationState implements SQLCloseable { MultiRowMutationState multiRowMutationState = mutations.remove(key); if (multiRowMutationState != null) { final List<Mutation> deleteMutations = Lists.newArrayList(); - generateMutations(tableRef, mutationTimestamp, serverTimestamp, multiRowMutationState, - deleteMutations, null); + generateMutations(key, mutationTimestamp, serverTimestamp, multiRowMutationState, deleteMutations, null); if (indexMutations == null) { indexMutations = deleteMutations; } else { @@ -968,7 +966,12 @@ public class MutationState implements SQLCloseable { uncommittedPhysicalNames.add(table.getPhysicalName().getString()); phoenixTransactionContext.markDMLFence(table); } - hTable = phoenixTransactionContext.getTransactionalTableWriter(connection, table, hTable, !tableInfo.isDataTable()); + // Only pass true for last argument if the index is being written to on it's own (i.e. initial + // index population), not if it's being written to for normal maintenance due to writes to + // the data table. This case is different because the initial index population does not need + // to be done transactionally since the index is only made active after all writes have + // occurred successfully. + hTable = phoenixTransactionContext.getTransactionalTableWriter(connection, table, hTable, tableInfo.isDataTable() && table.getType() == PTableType.INDEX); } numMutations = mutationList.size(); GLOBAL_MUTATION_BATCH_SIZE.update(numMutations); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java index c209b1f..de13605 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java @@ -34,6 +34,7 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; @@ -60,6 +61,7 @@ import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.index.PhoenixIndexMetaData; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.types.PVarbinary; @@ -78,14 +80,26 @@ import com.google.common.primitives.Longs; public class PhoenixTxIndexMutationGenerator { private final PhoenixIndexCodec codec; private final PhoenixIndexMetaData indexMetaData; + private final ConnectionQueryServices services; + private final byte[] regionStartKey; + private final byte[] regionEndKey; + private final byte[] tableName; - public PhoenixTxIndexMutationGenerator(Configuration conf, PhoenixIndexMetaData indexMetaData, byte[] tableName, byte[] regionStartKey, byte[] regionEndKey) { + private PhoenixTxIndexMutationGenerator(ConnectionQueryServices services, Configuration conf, PhoenixIndexMetaData indexMetaData, byte[] tableName, byte[] regionStartKey, byte[] regionEndKey) { + this.services = services; this.indexMetaData = indexMetaData; - this.codec = new PhoenixIndexCodec(conf, regionStartKey, regionEndKey, tableName); + this.regionStartKey = regionStartKey; + this.regionEndKey = regionEndKey; + this.tableName = tableName; + this.codec = new PhoenixIndexCodec(conf, tableName); } - public PhoenixTxIndexMutationGenerator(Configuration conf, PhoenixIndexMetaData indexMetaData, byte[] tableName) { - this(conf, indexMetaData, tableName, null, null); + public PhoenixTxIndexMutationGenerator(Configuration conf, PhoenixIndexMetaData indexMetaData, byte[] tableName, byte[] regionStartKey, byte[] regionEndKey) { + this(null, conf, indexMetaData, tableName, regionStartKey, regionEndKey); + } + + public PhoenixTxIndexMutationGenerator(ConnectionQueryServices services, PhoenixIndexMetaData indexMetaData, byte[] tableName) { + this(services, services.getConfiguration(), indexMetaData, tableName, null, null); } private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) { @@ -178,7 +192,7 @@ public class PhoenixTxIndexMutationGenerator { scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier); ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, null, true, -1); scanRanges.initializeScan(scan); - Table txTable = indexMetaData.getTransactionContext().getTransactionalTable(htable, isImmutable); + Table txTable = indexMetaData.getTransactionContext().getTransactionalTable(htable, true); // For rollback, we need to see all versions, including // the last committed version as there may be multiple // checkpointed versions. @@ -312,7 +326,18 @@ public class PhoenixTxIndexMutationGenerator { private void generateDeletes(PhoenixIndexMetaData indexMetaData, Collection<Pair<Mutation, byte[]>> indexUpdates, byte[] attribValue, TxTableState state) throws IOException { - Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData); + byte[] regionStartKey = this.regionStartKey; + byte[] regionEndKey = this.regionEndKey; + if (services != null && indexMetaData.hasLocalIndexes()) { + try { + HRegionLocation tableRegionLocation = services.getTableRegionLocation(tableName, state.getCurrentRowKey()); + regionStartKey = tableRegionLocation.getRegionInfo().getStartKey(); + regionEndKey = tableRegionLocation.getRegionInfo().getEndKey(); + } catch (SQLException e) { + throw new IOException(e); + } + } + Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData, regionStartKey, regionEndKey); for (IndexUpdate delete : deletes) { if (delete.isValid()) { delete.getUpdate().setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue); @@ -327,7 +352,18 @@ public class PhoenixTxIndexMutationGenerator { TxTableState state) throws IOException { state.applyMutation(); - Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData); + byte[] regionStartKey = this.regionStartKey; + byte[] regionEndKey = this.regionEndKey; + if (services != null && indexMetaData.hasLocalIndexes()) { + try { + HRegionLocation tableRegionLocation = services.getTableRegionLocation(tableName, state.getCurrentRowKey()); + regionStartKey = tableRegionLocation.getRegionInfo().getStartKey(); + regionEndKey = tableRegionLocation.getRegionInfo().getEndKey(); + } catch (SQLException e) { + throw new IOException(e); + } + } + Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData, regionStartKey, regionEndKey); boolean validPut = false; for (IndexUpdate put : puts) { if (put.isValid()) { @@ -475,8 +511,8 @@ public class PhoenixTxIndexMutationGenerator { }; try { PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(indexMetaDataCache, attributes); - return new PhoenixTxIndexMutationGenerator(connection.getQueryServices().getConfiguration(), indexMetaData, - table.getPhysicalName().getBytes()); + return new PhoenixTxIndexMutationGenerator(connection.getQueryServices(),connection.getQueryServices().getConfiguration(), indexMetaData, + table.getPhysicalName().getBytes(), null, null); } catch (IOException e) { throw new RuntimeException(e); // Impossible } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java index 968bdab..0b7d9ef 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java @@ -58,7 +58,7 @@ public abstract class BaseIndexBuilder implements IndexBuilder { Constructor<? extends IndexCodec> meth = codecClass.getDeclaredConstructor(new Class[0]); meth.setAccessible(true); this.codec = meth.newInstance(); - this.codec.initialize(conf, env.getRegionInfo().getStartKey(), env.getRegionInfo().getEndKey(), env.getRegion().getRegionInfo().getTable().getName()); + this.codec.initialize(conf, env.getRegion().getRegionInfo().getTable().getName()); } catch (Exception e) { throw new IOException(e); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java index 7dde941..0b70de3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java @@ -39,10 +39,12 @@ public interface IndexCodec { * the current state of the table that needs to be cleaned up. Generally, you only care about the latest * column values, for each column you are indexing for each index table. * @param context TODO + * @param regionStartKey TODO + * @param regionEndKey TODO * @return the pairs of (deletes, index table name) that should be applied. * @throws IOException */ - public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) throws IOException; + public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) throws IOException; // table state has the pending update already applied, before calling // get the new index entries @@ -59,10 +61,12 @@ public interface IndexCodec { * the current state of the table that needs to an index update Generally, you only care about the latest * column values, for each column you are indexing for each index table. * @param context TODO + * @param regionStartKey TODO + * @param regionEndKey TODO * @return the pairs of (updates,index table name) that should be applied. * @throws IOException */ - public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) throws IOException; + public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) throws IOException; /** * This allows the codec to dynamically change whether or not indexing should take place for a table. If it doesn't @@ -80,5 +84,5 @@ public interface IndexCodec { */ public boolean isEnabled(Mutation m) throws IOException; - public void initialize(Configuration conf, byte[] startKey, byte[] endKey, byte[] tableName); + public void initialize(Configuration conf, byte[] tableName); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java index 59a8a2b..54d7f87 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java @@ -125,7 +125,7 @@ public class LocalTableState implements TableState { * state for any of the columns you are indexing. * <p> * <i>NOTE:</i> This method should <b>not</b> be used during - * {@link IndexCodec#getIndexDeletes(TableState, BatchState)} as the pending update will not yet have been + * {@link IndexCodec#getIndexDeletes(TableState, BatchState, byte[], byte[])} as the pending update will not yet have been * applied - you are merely attempting to cleanup the current state and therefore do <i>not</i> * need to track the indexed columns. * <p> http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java index 60f86d4..1e4da87 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java @@ -183,7 +183,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { throws IOException { // get the index updates for this current batch - Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state, indexMetaData); + Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state, indexMetaData, env.getRegionInfo().getStartKey(), env.getRegionInfo().getEndKey()); state.resetTrackedColumns(); /* @@ -221,7 +221,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { } /** - * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState, IndexMetaData)} and then add them to the + * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState, IndexMetaData, byte[], byte[])} and then add them to the * update map. * <p> * Expects the {@link LocalTableState} to already be correctly setup (correct timestamp, updates applied, etc). @@ -231,7 +231,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder { */ protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, LocalTableState state, long ts, IndexMetaData indexMetaData) throws IOException { - Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state, indexMetaData); + Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state, indexMetaData, env.getRegionInfo().getStartKey(), env.getRegionInfo().getEndKey()); if (cleanup != null) { for (IndexUpdate d : cleanup) { if (!d.isValid()) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 3416b21..ffbf98f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -101,7 +101,7 @@ import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.tuple.BaseTuple; import org.apache.phoenix.schema.tuple.ValueGetterTuple; import org.apache.phoenix.schema.types.PDataType; -import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature; import org.apache.phoenix.util.BitSet; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EncodedColumnsUtil; @@ -184,9 +184,24 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { */ public static void serialize(PTable dataTable, ImmutableBytesWritable ptr, PhoenixConnection connection) { List<PTable> indexes = dataTable.getIndexes(); - serialize(dataTable, ptr, indexes, connection); + serializeServerMaintainedIndexes(dataTable, ptr, indexes, connection); } + public static void serializeServerMaintainedIndexes(PTable dataTable, ImmutableBytesWritable ptr, + List<PTable> indexes, PhoenixConnection connection) { + Iterator<PTable> indexesItr = Collections.emptyListIterator(); + boolean onlyLocalIndexes = dataTable.isImmutableRows() || dataTable.isTransactional(); + if (onlyLocalIndexes) { + if (!dataTable.isTransactional() + || !dataTable.getTransactionProvider().getTransactionProvider().isUnsupported(Feature.MAINTAIN_LOCAL_INDEX_ON_SERVER)) { + indexesItr = maintainedLocalIndexes(indexes.iterator()); + } + } else { + indexesItr = maintainedIndexes(indexes.iterator()); + } + + serialize(dataTable, ptr, Lists.newArrayList(indexesItr), connection); + } /** * For client-side to serialize all IndexMaintainers for a given table * @param dataTable data table @@ -195,22 +210,11 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { */ public static void serialize(PTable dataTable, ImmutableBytesWritable ptr, List<PTable> indexes, PhoenixConnection connection) { - Iterator<PTable> indexesItr; - boolean onlyLocalIndexes = dataTable.isImmutableRows() || dataTable.isTransactional(); - if (onlyLocalIndexes) { - indexesItr = maintainedLocalIndexes(indexes.iterator()); - } else { - indexesItr = maintainedIndexes(indexes.iterator()); - } - if (!indexesItr.hasNext()) { + if (indexes.isEmpty()) { ptr.set(ByteUtil.EMPTY_BYTE_ARRAY); return; } - int nIndexes = 0; - while (indexesItr.hasNext()) { - nIndexes++; - indexesItr.next(); - } + int nIndexes = indexes.size(); ByteArrayOutputStream stream = new ByteArrayOutputStream(); DataOutputStream output = new DataOutputStream(stream); try { @@ -218,11 +222,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { WritableUtils.writeVInt(output, nIndexes * (dataTable.getBucketNum() == null ? 1 : -1)); // Write out data row key schema once, since it's the same for all index maintainers dataTable.getRowKeySchema().write(output); - indexesItr = onlyLocalIndexes - ? maintainedLocalIndexes(indexes.iterator()) - : maintainedIndexes(indexes.iterator()); - while (indexesItr.hasNext()) { - org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = IndexMaintainer.toProto(indexesItr.next().getIndexMaintainer(dataTable, connection)); + for (PTable index : indexes) { + org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = IndexMaintainer.toProto(index.getIndexMaintainer(dataTable, connection)); byte[] protoBytes = proto.toByteArray(); WritableUtils.writeVInt(output, protoBytes.length); output.write(protoBytes); @@ -285,31 +286,33 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } private static List<IndexMaintainer> deserialize(byte[] buf, int offset, int length, boolean useProtoForIndexMaintainer) { - ByteArrayInputStream stream = new ByteArrayInputStream(buf, offset, length); - DataInput input = new DataInputStream(stream); List<IndexMaintainer> maintainers = Collections.emptyList(); - try { - int size = WritableUtils.readVInt(input); - boolean isDataTableSalted = size < 0; - size = Math.abs(size); - RowKeySchema rowKeySchema = new RowKeySchema(); - rowKeySchema.readFields(input); - maintainers = Lists.newArrayListWithExpectedSize(size); - for (int i = 0; i < size; i++) { - if (useProtoForIndexMaintainer) { - int protoSize = WritableUtils.readVInt(input); - byte[] b = new byte[protoSize]; - input.readFully(b); - org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = ServerCachingProtos.IndexMaintainer.parseFrom(b); - maintainers.add(IndexMaintainer.fromProto(proto, rowKeySchema, isDataTableSalted)); - } else { - IndexMaintainer maintainer = new IndexMaintainer(rowKeySchema, isDataTableSalted); - maintainer.readFields(input); - maintainers.add(maintainer); + if (length > 0) { + ByteArrayInputStream stream = new ByteArrayInputStream(buf, offset, length); + DataInput input = new DataInputStream(stream); + try { + int size = WritableUtils.readVInt(input); + boolean isDataTableSalted = size < 0; + size = Math.abs(size); + RowKeySchema rowKeySchema = new RowKeySchema(); + rowKeySchema.readFields(input); + maintainers = Lists.newArrayListWithExpectedSize(size); + for (int i = 0; i < size; i++) { + if (useProtoForIndexMaintainer) { + int protoSize = WritableUtils.readVInt(input); + byte[] b = new byte[protoSize]; + input.readFully(b); + org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = ServerCachingProtos.IndexMaintainer.parseFrom(b); + maintainers.add(IndexMaintainer.fromProto(proto, rowKeySchema, isDataTableSalted)); + } else { + IndexMaintainer maintainer = new IndexMaintainer(rowKeySchema, isDataTableSalted); + maintainer.readFields(input); + maintainers.add(maintainer); + } } + } catch (IOException e) { + throw new RuntimeException(e); // Impossible } - } catch (IOException e) { - throw new RuntimeException(e); // Impossible } return maintainers; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index 04dcbc4..bb50fa1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -36,8 +36,8 @@ import com.google.common.collect.Sets; /** * Phoenix-based {@link IndexCodec}. Manages all the logic of how to cleanup an index ( - * {@link #getIndexDeletes(TableState, IndexMetaData)}) as well as what the new index state should be ( - * {@link #getIndexUpserts(TableState, IndexMetaData)}). + * {@link #getIndexDeletes(TableState, IndexMetaData, byte[], byte[])}) as well as what the new index state should be ( + * {@link #getIndexUpserts(TableState, IndexMetaData, byte[], byte[])}). */ public class PhoenixIndexCodec extends BaseIndexCodec { public static final String INDEX_MD = "IdxMD"; @@ -46,23 +46,19 @@ public class PhoenixIndexCodec extends BaseIndexCodec { public static final String INDEX_MAINTAINERS = "IndexMaintainers"; public static KeyValueBuilder KV_BUILDER = GenericKeyValueBuilder.INSTANCE; - private byte[] regionStartKey; - private byte[] regionEndKey; private byte[] tableName; public PhoenixIndexCodec() { } - public PhoenixIndexCodec(Configuration conf, byte[] regionStartKey, byte[] regionEndKey, byte[] tableName) { - initialize(conf, regionStartKey, regionEndKey, tableName); + public PhoenixIndexCodec(Configuration conf, byte[] tableName) { + initialize(conf, tableName); } @Override - public void initialize(Configuration conf, byte[] regionStartKey, byte[] regionEndKey, byte[] tableName) { - this.regionStartKey = regionStartKey; - this.regionEndKey = regionEndKey; + public void initialize(Configuration conf, byte[] tableName) { this.tableName = tableName; } @@ -74,7 +70,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec { } @Override - public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) throws IOException { + public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) throws IOException { PhoenixIndexMetaData metaData = (PhoenixIndexMetaData)context; List<IndexMaintainer> indexMaintainers = metaData.getIndexMaintainers(); if (indexMaintainers.get(0).isRowDeleted(state.getPendingUpdate())) { @@ -97,7 +93,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec { } @Override - public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) throws IOException { + public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context, byte[] regionStartKey, byte[] regionEndKey) throws IOException { PhoenixIndexMetaData metaData = (PhoenixIndexMetaData)context; List<IndexMaintainer> indexMaintainers = metaData.getIndexMaintainers(); ImmutableBytesWritable ptr = new ImmutableBytesWritable(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 2ddd659..73737b4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -100,7 +100,7 @@ public class PhoenixTransactionalIndexer implements RegionObserver, RegionCoproc public void start(CoprocessorEnvironment e) throws IOException { final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment)e; String serverName = env.getServerName().getServerName(); - codec = new PhoenixIndexCodec(env.getConfiguration(), env.getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey(), env.getRegionInfo().getTable().getName()); + codec = new PhoenixIndexCodec(env.getConfiguration(), env.getRegionInfo().getTable().getName()); DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(env, ConnectionType.INDEX_WRITER_CONNECTION); // setup the actual index writer // For transactional tables, we keep the index active upon a write failure http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java index e2ea759..f02e9d3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java @@ -90,7 +90,10 @@ public class ScanningResultIterator implements ResultIterator { private void getScanMetrics() { if (!scanMetricsUpdated && scanMetricsEnabled) { - ScanMetrics scanMetrics = scanner.getScanMetrics(); + ScanMetrics scanMetrics = scan.getScanMetrics(); + if (scanMetrics == null) { + return; + } Map<String, Long> scanMetricsMap = scanMetrics.getMetricsMap(); if(scanMetricsMap == null) { return; http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index 9599b1f..267aa1d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -51,7 +51,6 @@ import org.apache.phoenix.join.HashCacheClient; import org.apache.phoenix.monitoring.ScanMetricsHolder; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; @@ -193,7 +192,7 @@ public class TableResultIterator implements ResultIterator { if (retry <= 0) { throw e1; } - Long cacheId = ((HashJoinCacheNotFoundException) e1).getCacheId(); + Long cacheId = e1.getCacheId(); retry--; try { ServerCache cache = caches == null ? null : http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java index 11a8176..6100b20 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/DirectHTableWriter.java @@ -26,11 +26,17 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.transaction.PhoenixTransactionProvider; +import org.apache.phoenix.transaction.TransactionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + /** * Writes mutations directly to HBase using HBase front-door APIs. */ @@ -65,7 +71,24 @@ public class DirectHTableWriter { public void write(List<Mutation> mutations) throws IOException, InterruptedException { Object[] results = new Object[mutations.size()]; - table.batch(mutations, results); + String txnIdStr = conf.get(PhoenixConfigurationUtil.TX_SCN_VALUE); + if (txnIdStr == null) { + table.batch(mutations, results); + } else { + long ts = Long.parseLong(txnIdStr); + PhoenixTransactionProvider provider = TransactionFactory.Provider.getDefault().getTransactionProvider(); + String txnProviderStr = conf.get(PhoenixConfigurationUtil.TX_PROVIDER); + if (txnProviderStr != null) { + provider = TransactionFactory.Provider.valueOf(txnProviderStr).getTransactionProvider(); + } + List<Mutation> shadowedMutations = Lists.newArrayListWithExpectedSize(mutations.size()); + for (Mutation m : mutations) { + if (m instanceof Put) { + shadowedMutations.add(provider.markPutAsCommitted((Put)m, ts, ts)); + } + } + table.batch(shadowedMutations, results); + } } protected Configuration getConf() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index 94d5e73..c7e7b88 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -383,6 +383,7 @@ public class IndexTool extends Configured implements Tool { if (pdataTable.isTransactional()) { configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE, Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange))); + configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, pdataTable.getTransactionProvider().name()); } configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, Long.toString(maxTimeRange)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java index 6f469e6..8318adf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java @@ -21,28 +21,37 @@ import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor; import org.apache.phoenix.mapreduce.PhoenixJobCounters; import org.apache.phoenix.mapreduce.util.ConnectionUtil; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.transaction.PhoenixTransactionProvider; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.PhoenixKeyValueUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + /** * Mapper that hands over rows from data table to the index table. * @@ -95,6 +104,18 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1); + PhoenixTransactionProvider provider = null; + Configuration conf = context.getConfiguration(); + long ts = HConstants.LATEST_TIMESTAMP; + String txnIdStr = conf.get(PhoenixConfigurationUtil.TX_SCN_VALUE); + if (txnIdStr != null) { + ts = Long.parseLong(txnIdStr); + provider = TransactionFactory.Provider.getDefault().getTransactionProvider(); + String txnProviderStr = conf.get(PhoenixConfigurationUtil.TX_PROVIDER); + if (txnProviderStr != null) { + provider = TransactionFactory.Provider.valueOf(txnProviderStr).getTransactionProvider(); + } + } try { final ImmutableBytesWritable outputKey = new ImmutableBytesWritable(); final List<Object> values = record.getValues(); @@ -102,16 +123,30 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD indxWritable.write(this.pStatement); this.pStatement.execute(); - final Iterator<Pair<byte[], List<Cell>>> uncommittedDataIterator = PhoenixRuntime.getUncommittedDataIterator(connection, true); - while (uncommittedDataIterator.hasNext()) { - Pair<byte[], List<Cell>> kvPair = uncommittedDataIterator.next(); - if (Bytes.compareTo(Bytes.toBytes(indexTableName), kvPair.getFirst()) != 0) { + PhoenixConnection pconn = connection.unwrap(PhoenixConnection.class); + final Iterator<Pair<byte[],List<Mutation>>> iterator = pconn.getMutationState().toMutations(true); + while (iterator.hasNext()) { + Pair<byte[], List<Mutation>> pair = iterator.next(); + if (Bytes.compareTo(Bytes.toBytes(indexTableName), pair.getFirst()) != 0) { // skip edits for other tables continue; } - List<Cell> keyValueList = kvPair.getSecond(); - keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList); - for (Cell kv : keyValueList) { + List<Cell> keyValues = Lists.newArrayListWithExpectedSize(pair.getSecond().size() * 5); // Guess-timate 5 key values per row + for (Mutation mutation : pair.getSecond()) { + if (mutation instanceof Put) { + if (provider != null) { + mutation = provider.markPutAsCommitted((Put)mutation, ts, ts); + } + for (List<Cell> cellList : mutation.getFamilyCellMap().values()) { + List<Cell>keyValueList = preUpdateProcessor.preUpsert(mutation.getRow(), cellList); + for (Cell keyValue : keyValueList) { + keyValues.add(keyValue); + } + } + } + } + Collections.sort(keyValues, pconn.getKeyValueBuilder().getKeyValueComparator()); + for (Cell kv : keyValues) { outputKey.set(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()); context.write(outputKey, PhoenixKeyValueUtil.maybeCopyCell(kv)); } @@ -137,4 +172,4 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java index 909c85c..35f82c1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -105,6 +105,8 @@ public final class PhoenixConfigurationUtil { public static final String TX_SCN_VALUE = "phoenix.mr.txscn.value"; + public static final String TX_PROVIDER = "phoenix.mr.txprovider"; + /** Configuration key for the class name of an ImportPreUpsertKeyValueProcessor */ public static final String UPSERT_HOOK_CLASS_CONFKEY = "phoenix.mapreduce.import.kvprocessor"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index 5394d05..942e77f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -169,7 +169,7 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated public QueryLoggerDisruptor getQueryDisruptor(); - public PhoenixTransactionClient initTransactionClient(TransactionFactory.Provider provider); + public PhoenixTransactionClient initTransactionClient(TransactionFactory.Provider provider) throws SQLException; /** * Writes a cell to SYSTEM.MUTEX using checkAndPut to ensure only a single client can execute a @@ -184,4 +184,4 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated */ public void deleteMutexCell(String tenantId, String schemaName, String tableName, String columnName, String familyName) throws SQLException; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index f1e3d82..08f7310 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -152,7 +152,6 @@ import org.apache.phoenix.coprocessor.MetaDataRegionObserver; import org.apache.phoenix.coprocessor.ScanRegionObserver; import org.apache.phoenix.coprocessor.SequenceRegionObserver; import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl; -import org.apache.phoenix.coprocessor.TephraTransactionalProcessor; import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; import org.apache.phoenix.coprocessor.generated.MetaDataProtos; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest; @@ -242,6 +241,7 @@ import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.transaction.PhoenixTransactionClient; import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.PhoenixTransactionProvider; import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.transaction.TransactionFactory.Provider; import org.apache.phoenix.util.ByteUtil; @@ -868,10 +868,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if(!newDesc.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) { builder.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, priority, null); } - // For ALTER TABLE - boolean nonTxToTx = Boolean.TRUE.equals(tableProps.get(PhoenixTransactionContext.READ_NON_TX_DATA)); - boolean isTransactional = - Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) || nonTxToTx; + TransactionFactory.Provider provider = getTransactionProvider(tableProps); + boolean isTransactional = (provider != null); // TODO: better encapsulation for this // Since indexes can't have indexes, don't install our indexing coprocessor for indexes. // Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use @@ -934,22 +932,27 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } if (isTransactional) { - TransactionFactory.Provider provider = (TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(tableProps); - if (provider == null) { - String providerValue = this.props.get(QueryServices.DEFAULT_TRANSACTION_PROVIDER_ATTRIB, QueryServicesOptions.DEFAULT_TRANSACTION_PROVIDER); - provider = (TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(providerValue); - } Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor(); if (!newDesc.hasCoprocessor(coprocessorClass.getName())) { builder.addCoprocessor(coprocessorClass.getName(), null, priority - 10, null); } + Class<? extends RegionObserver> coprocessorGCClass = provider.getTransactionProvider().getGCCoprocessor(); + if (coprocessorGCClass != null) { + if (!newDesc.hasCoprocessor(coprocessorGCClass.getName())) { + builder.addCoprocessor(coprocessorGCClass.getName(), null, priority - 10, null); + } + } } else { // Remove all potential transactional coprocessors - for (TransactionFactory.Provider provider : TransactionFactory.Provider.values()) { - Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor(); + for (TransactionFactory.Provider aprovider : TransactionFactory.Provider.values()) { + Class<? extends RegionObserver> coprocessorClass = aprovider.getTransactionProvider().getCoprocessor(); + Class<? extends RegionObserver> coprocessorGCClass = aprovider.getTransactionProvider().getGCCoprocessor(); if (coprocessorClass != null && newDesc.hasCoprocessor(coprocessorClass.getName())) { builder.removeCoprocessor(coprocessorClass.getName()); } + if (coprocessorGCClass != null && newDesc.hasCoprocessor(coprocessorGCClass.getName())) { + builder.removeCoprocessor(coprocessorGCClass.getName()); + } } } } catch (IOException e) { @@ -957,6 +960,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } + private TransactionFactory.Provider getTransactionProvider(Map<String,Object> tableProps) { + TransactionFactory.Provider provider = (TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(tableProps); + return provider; + } + private static interface RetriableOperation { boolean checkForCompletion() throws TimeoutException, IOException; String getOperationName(); @@ -1177,15 +1185,30 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (!modifyExistingMetaData) { return existingDesc; // Caller already knows that no metadata was changed } - boolean willBeTx = Boolean.TRUE.equals(props.get(TableProperty.TRANSACTIONAL.name())); + TransactionFactory.Provider provider = getTransactionProvider(props); + boolean willBeTx = provider != null; // If mapping an existing table as transactional, set property so that existing // data is correctly read. if (willBeTx) { - newDesc.setValue(PhoenixTransactionContext.READ_NON_TX_DATA, Boolean.TRUE.toString()); + if (!equalTxCoprocessor(provider, existingDesc, newDesc.build())) { + // Cannot switch between different providers + if (hasTxCoprocessor(existingDesc)) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SWITCH_TXN_PROVIDERS) + .setSchemaName(SchemaUtil.getSchemaNameFromFullName(physicalTableName)) + .setTableName(SchemaUtil.getTableNameFromFullName(physicalTableName)).build().buildException(); + } + if (provider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.ALTER_NONTX_TO_TX)) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL) + .setMessage(provider.name()) + .setSchemaName(SchemaUtil.getSchemaNameFromFullName(physicalTableName)) + .setTableName(SchemaUtil.getTableNameFromFullName(physicalTableName)).build().buildException(); + } + newDesc.setValue(PhoenixTransactionContext.READ_NON_TX_DATA, Boolean.TRUE.toString()); + } } else { // If we think we're creating a non transactional table when it's already // transactional, don't allow. - if (existingDesc.hasCoprocessor(TephraTransactionalProcessor.class.getName())) { + if (hasTxCoprocessor(existingDesc)) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX) .setSchemaName(SchemaUtil.getSchemaNameFromFullName(physicalTableName)) .setTableName(SchemaUtil.getTableNameFromFullName(physicalTableName)).build().buildException(); @@ -1219,6 +1242,21 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } return null; // will never make it here } + + private static boolean hasTxCoprocessor(TableDescriptor descriptor) { + for (TransactionFactory.Provider provider : TransactionFactory.Provider.values()) { + Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor(); + if (coprocessorClass != null && descriptor.hasCoprocessor(coprocessorClass.getName())) { + return true; + } + } + return false; + } + + private static boolean equalTxCoprocessor(TransactionFactory.Provider provider, TableDescriptor existingDesc, TableDescriptor newDesc) { + Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor(); + return (coprocessorClass != null && existingDesc.hasCoprocessor(coprocessorClass.getName()) && newDesc.hasCoprocessor(coprocessorClass.getName())); +} private void modifyTable(byte[] tableName, TableDescriptor newDesc, boolean shouldPoll) throws IOException, InterruptedException, TimeoutException, SQLException { @@ -1951,6 +1989,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } else { indexTableProps = Maps.newHashMapWithExpectedSize(1); indexTableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, Boolean.valueOf(txValue)); + indexTableProps.put(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER, tableProps.get(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER)); } for (PTable index : table.getIndexes()) { TableDescriptor indexDesc = admin.getDescriptor(TableName.valueOf(index.getPhysicalName().getBytes())); @@ -2073,6 +2112,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement boolean willBeTransactional = false; boolean isOrWillBeTransactional = isTransactional; Integer newTTL = null; + TransactionFactory.Provider txProvider = null; for (String family : properties.keySet()) { List<Pair<String, Object>> propsList = properties.get(family); if (propsList != null && propsList.size() > 0) { @@ -2083,20 +2123,27 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if ((MetaDataUtil.isHTableProperty(propName) || TableProperty.isPhoenixTableProperty(propName)) && addingColumns) { // setting HTable and PhoenixTable properties while adding a column is not allowed. throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_TABLE_PROPERTY_ADD_COLUMN) - .setMessage("Property: " + propName).build() + .setMessage("Property: " + propName) + .setSchemaName(table.getSchemaName().getString()) + .setTableName(table.getTableName().getString()) + .build() .buildException(); } if (MetaDataUtil.isHTableProperty(propName)) { // Can't have a column family name for a property that's an HTableProperty if (!family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY)) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY) - .setMessage("Column Family: " + family + ", Property: " + propName).build() + .setMessage("Column Family: " + family + ", Property: " + propName) + .setSchemaName(table.getSchemaName().getString()) + .setTableName(table.getTableName().getString()) + .build() .buildException(); } tableProps.put(propName, propValue); } else { if (TableProperty.isPhoenixTableProperty(propName)) { - TableProperty.valueOf(propName).validate(true, !family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY), table.getType()); + TableProperty tableProp = TableProperty.valueOf(propName); + tableProp.validate(true, !family.equals(QueryConstants.ALL_FAMILY_PROPERTIES_KEY), table.getType()); if (propName.equals(TTL)) { newTTL = ((Number)prop.getSecond()).intValue(); // Even though TTL is really a HColumnProperty we treat it specially. @@ -2105,6 +2152,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } else if (propName.equals(PhoenixDatabaseMetaData.TRANSACTIONAL) && Boolean.TRUE.equals(propValue)) { willBeTransactional = isOrWillBeTransactional = true; tableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, propValue); + } else if (propName.equals(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER) && propValue != null) { + willBeTransactional = isOrWillBeTransactional = true; + tableProps.put(PhoenixTransactionContext.READ_NON_TX_DATA, Boolean.TRUE); + txProvider = (Provider)TableProperty.TRANSACTION_PROVIDER.getValue(propValue); + tableProps.put(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER, txProvider); } } else { if (MetaDataUtil.isHColumnProperty(propName)) { @@ -2118,12 +2170,26 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // FIXME: This isn't getting triggered as currently a property gets evaluated // as HTableProp if its neither HColumnProp or PhoenixTableProp. throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_PROPERTY) - .setMessage("Column Family: " + family + ", Property: " + propName).build() + .setMessage("Column Family: " + family + ", Property: " + propName) + .setSchemaName(table.getSchemaName().getString()) + .setTableName(table.getTableName().getString()) + .build() .buildException(); } } } } + if (isOrWillBeTransactional && newTTL != null) { + TransactionFactory.Provider isOrWillBeTransactionProvider = txProvider == null ? table.getTransactionProvider() : txProvider; + if (isOrWillBeTransactionProvider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.SET_TTL)) { + throw new SQLExceptionInfo.Builder(PhoenixTransactionProvider.Feature.SET_TTL.getCode()) + .setMessage(isOrWillBeTransactionProvider.name()) + .setSchemaName(table.getSchemaName().getString()) + .setTableName(table.getTableName().getString()) + .build() + .buildException(); + } + } if (!colFamilyPropsMap.isEmpty()) { stmtFamiliesPropsMap.put(family, colFamilyPropsMap); } @@ -4761,7 +4827,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } @Override - public synchronized PhoenixTransactionClient initTransactionClient(Provider provider) { + public synchronized PhoenixTransactionClient initTransactionClient(Provider provider) throws SQLException { PhoenixTransactionClient client = txClients[provider.ordinal()]; if (client == null) { client = txClients[provider.ordinal()] = provider.getTransactionProvider().getTransactionClient(config, connectionInfo); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d720f39e/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index 57ff6e8..57af9e0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -368,7 +368,7 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple } @Override - public PhoenixTransactionClient initTransactionClient(Provider provider) { + public PhoenixTransactionClient initTransactionClient(Provider provider) throws SQLException { return getDelegate().initTransactionClient(provider); }
