Repository: phoenix Updated Branches: refs/heads/txn 0ad5c5675 -> e1521ab05
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java new file mode 100644 index 0000000..dd71a58 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.apache.phoenix.index; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import co.cask.tephra.Transaction; +import co.cask.tephra.hbase98.TransactionAwareHTable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.hbase.index.MultiMutation; +import org.apache.phoenix.hbase.index.ValueGetter; +import org.apache.phoenix.hbase.index.covered.IndexUpdate; +import org.apache.phoenix.hbase.index.covered.TableState; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; +import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.hbase.index.util.IndexManagementUtil; +import org.apache.phoenix.hbase.index.write.IndexWriter; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.schema.types.PVarbinary; +import org.apache.phoenix.trace.TracingUtils; +import org.apache.phoenix.trace.util.NullSpan; +import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.cloudera.htrace.Span; +import org.cloudera.htrace.Trace; +import org.cloudera.htrace.TraceScope; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +/** + * Do all the work of managing index updates for a transactional table from a single coprocessor. Since the transaction + * manager essentially time orders writes through conflict detection, the logic to maintain a secondary index is quite a + * bit simpler than the non transactional case. For example, there's no need to muck with the WAL, as failure scenarios + * are handled by aborting the transaction. + */ +public class PhoenixTransactionalIndexer extends BaseRegionObserver { + + private static final Log LOG = LogFactory.getLog(PhoenixTransactionalIndexer.class); + + private PhoenixIndexCodec codec; + private IndexWriter writer; + private boolean stopped; + + @Override + public void start(CoprocessorEnvironment e) throws IOException { + final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment)e; + String serverName = env.getRegionServerServices().getServerName().getServerName(); + codec = new PhoenixIndexCodec(); + codec.initialize(env); + + // setup the actual index writer + this.writer = new IndexWriter(env, serverName + "-tx-index-writer"); + } + + @Override + public void stop(CoprocessorEnvironment e) throws IOException { + if (this.stopped) { return; } + this.stopped = true; + String msg = "TxIndexer is being stopped"; + this.writer.stop(msg); + } + + @Override + public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, + MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { + + Mutation m = miniBatchOp.getOperation(0); + if (!codec.isEnabled(m)) { + super.preBatchMutate(c, miniBatchOp); + return; + } + + Collection<Pair<Mutation, byte[]>> indexUpdates = null; + // get the current span, or just use a null-span to avoid a bunch of if statements + try (TraceScope scope = Trace.startSpan("Starting to build index updates")) { + Span current = scope.getSpan(); + if (current == null) { + current = NullSpan.INSTANCE; + } + + // get the index updates for all elements in this batch + indexUpdates = getIndexUpdates(c.getEnvironment(), miniBatchOp); + + current.addTimelineAnnotation("Built index updates, doing preStep"); + TracingUtils.addAnnotation(current, "index update count", indexUpdates.size()); + + // no index updates, so we are done + if (!indexUpdates.isEmpty()) { + this.writer.write(indexUpdates); + } + } catch (Throwable t) { + LOG.error("Failed to update index with entries:" + indexUpdates, t); + IndexManagementUtil.rethrowIndexingException(t); + } + } + + private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { + // Collect the set of mutable ColumnReferences so that we can first + // run a scan to get the current state. We'll need this to delete + // the existing index rows. + Map<String,byte[]> updateAttributes = miniBatchOp.getOperation(0).getAttributesMap(); + PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(env,updateAttributes); + Transaction tx = indexMetaData.getTransaction(); + assert(tx != null); + List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers(); + Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10); + for (IndexMaintainer indexMaintainer : indexMaintainers) { + if (!indexMaintainer.isImmutableRows()) { + mutableColumns.addAll(indexMaintainer.getAllColumns()); + } + } + ResultScanner scanner = null; + TransactionAwareHTable txTable = null; + + // Collect up all mutations in batch + Map<ImmutableBytesPtr, MultiMutation> mutations = + new HashMap<ImmutableBytesPtr, MultiMutation>(); + for (int i = 0; i < miniBatchOp.size(); i++) { + Mutation m = miniBatchOp.getOperation(i); + // add the mutation to the batch set + ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow()); + MultiMutation stored = mutations.get(row); + // we haven't seen this row before, so add it + if (stored == null) { + stored = new MultiMutation(row); + mutations.put(row, stored); + } + stored.addAll(m); + } + + try { + if (!mutableColumns.isEmpty()) { + List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size()); + for (ImmutableBytesPtr ptr : mutations.keySet()) { + keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary())); + } + Scan scan = new Scan(); + ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN); + scanRanges.initializeScan(scan); + scan.setFilter(scanRanges.getSkipScanFilter()); + TableName tableName = env.getRegion().getRegionInfo().getTable(); + HTableInterface htable = env.getTable(tableName); + txTable = new TransactionAwareHTable(htable); + txTable.startTx(tx); + scanner = txTable.getScanner(scan); + } + } finally { + if (txTable != null) txTable.close(); + } + + Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size()); + if (scanner == null) { + for (Mutation m : mutations.values()) { + TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), m); + state.applyMutation(m); + Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData); + for (IndexUpdate update : updates) { + indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName())); + } + } + } else { + Result result; + while ((result = scanner.next()) != null) { + TxTableState state = new TxTableState(env, mutableColumns, updateAttributes, tx.getWritePointer(), result); + Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData); + for (IndexUpdate delete : deletes) { + indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName())); + } + Mutation m = mutations.get(new ImmutableBytesPtr(result.getRow())); + state.applyMutation(m); + Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, indexMetaData); + for (IndexUpdate update : updates) { + indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName())); + } + } + } + return indexUpdates; + } + + + private static class TxTableState implements TableState { + private final byte[] rowKey; + private final long currentTimestamp; + private final RegionCoprocessorEnvironment env; + private final Map<String, byte[]> attributes; + private final List<Cell> pendingUpdates; + private final Set<ColumnReference> indexedColumns; + private final Map<ColumnReference, ImmutableBytesWritable> valueMap; + + private TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, byte[] rowKey) { + this.env = env; + this.currentTimestamp = currentTimestamp; + this.indexedColumns = indexedColumns; + this.attributes = attributes; + this.rowKey = rowKey; + int estimatedSize = indexedColumns.size(); + this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize); + this.pendingUpdates = Lists.newArrayListWithExpectedSize(estimatedSize); + } + + public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation m) { + this(env, indexedColumns, attributes, currentTimestamp, m.getRow()); + applyMutation(m); + } + + public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Result r) { + this(env, indexedColumns, attributes, currentTimestamp, r.getRow()); + + for (ColumnReference ref : indexedColumns) { + Cell cell = r.getColumnLatestCell(ref.getFamily(), ref.getQualifier()); + if (cell != null) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + valueMap.put(ref, ptr); + } + } + } + + @Override + public RegionCoprocessorEnvironment getEnvironment() { + return env; + } + + @Override + public long getCurrentTimestamp() { + return currentTimestamp; + } + + @Override + public Map<String, byte[]> getUpdateAttributes() { + return attributes; + } + + @Override + public byte[] getCurrentRowKey() { + return rowKey; + } + + @Override + public List<? extends IndexedColumnGroup> getIndexColumnHints() { + return Collections.emptyList(); + } + + public void applyMutation(Mutation m) { + if (m instanceof Delete) { + valueMap.clear(); + } else { + CellScanner scanner = m.cellScanner(); + try { + while (scanner.advance()) { + Cell cell = scanner.current(); + if (cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) { + ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + valueMap.remove(ref); + } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) { + for (ColumnReference ref : indexedColumns) { + if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) { + valueMap.remove(ref); + } + } + } else { + ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + valueMap.put(ref, ptr); + } + } + } catch (IOException e) { + throw new RuntimeException(e); // Impossible + } + } + } + + @Override + public Collection<Cell> getPendingUpdate() { + return pendingUpdates; + } + + @Override + public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns) + throws IOException { + // TODO: creating these objects over and over again is wasteful + ColumnTracker tracker = new ColumnTracker(indexedColumns); + ValueGetter getter = new ValueGetter() { + + @Override + public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException { + return valueMap.get(ref); + } + + @Override + public byte[] getRowKey() { + return rowKey; + } + + }; + Pair<ValueGetter, IndexUpdate> pair = new Pair<ValueGetter, IndexUpdate>(getter, new IndexUpdate(tracker)); + return pair; + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java deleted file mode 100644 index c471df6..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.index; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; -import org.apache.phoenix.hbase.index.covered.TxIndexBuilder; -import org.apache.phoenix.hbase.index.write.IndexWriter; -import org.apache.phoenix.util.IndexUtil; - -public class PhoenixTxIndexBuilder extends TxIndexBuilder { - @Override - public void setup(RegionCoprocessorEnvironment env) throws IOException { - super.setup(env); - Configuration conf = env.getConfiguration(); - // Install failure policy that just re-throws exception instead of killing RS - // or disabling the index - conf.set(IndexWriter.INDEX_FAILURE_POLICY_CONF_KEY, PhoenixTxIndexFailurePolicy.class.getName()); - } - - @Override - public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { - // The entire purpose of this method impl is to get the existing rows for the - // table rows being indexed into the block cache, as the index maintenance code - // does a point scan per row. - // TODO: provide a means for the transactional case to just return the Scanner - // for when this is executed as it seems like that would be more efficient. - IndexUtil.loadMutatingRowsIntoBlockCache(this.env.getRegion(), getCodec(), miniBatchOp, useRawScanToPrimeBlockCache()); - } - - private PhoenixIndexCodec getCodec() { - return (PhoenixIndexCodec)this.codec; - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java deleted file mode 100644 index fa70cc9..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by - * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language - * governing permissions and limitations under the License. - */ -package org.apache.phoenix.index; - -import java.io.IOException; - -import org.apache.hadoop.hbase.Stoppable; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; -import org.apache.phoenix.hbase.index.write.IndexFailurePolicy; - -import com.google.common.collect.Multimap; - -public class PhoenixTxIndexFailurePolicy implements IndexFailurePolicy { - private Stoppable parent; - - @Override - public void stop(String why) { - if (parent != null) { - parent.stop(why); - } - } - - @Override - public boolean isStopped() { - return parent == null ? false : parent.isStopped(); - } - - @Override - public void setup(Stoppable parent, RegionCoprocessorEnvironment env) { - this.parent = parent; - } - - @Override - public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) - throws IOException { - if (cause instanceof IOException) { - throw (IOException)cause; - } else if (cause instanceof RuntimeException) { throw (RuntimeException)cause; } - throw new IOException(cause); - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index dae7939..20ab602 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -112,7 +112,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.index.PhoenixIndexBuilder; import org.apache.phoenix.index.PhoenixIndexCodec; -import org.apache.phoenix.index.PhoenixTxIndexBuilder; +import org.apache.phoenix.index.PhoenixTransactionalIndexer; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; @@ -705,11 +705,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // all-or-none mutate class which break when this coprocessor is installed (PHOENIX-1318). if ((tableType != PTableType.INDEX && tableType != PTableType.VIEW) && !SchemaUtil.isMetaTable(tableName) - && !SchemaUtil.isStatsTable(tableName) - && !descriptor.hasCoprocessor(Indexer.class.getName())) { - Map<String, String> opts = Maps.newHashMapWithExpectedSize(1); - opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName()); - Indexer.enableIndexing(descriptor, isTransactional ? PhoenixTxIndexBuilder.class : PhoenixIndexBuilder.class, opts, priority); + && !SchemaUtil.isStatsTable(tableName)) { + if (isTransactional) { + if (!descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) { + descriptor.addCoprocessor(PhoenixTransactionalIndexer.class.getName(), null, priority, null); + } + } else { + if (!descriptor.hasCoprocessor(Indexer.class.getName())) { + Map<String, String> opts = Maps.newHashMapWithExpectedSize(1); + opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName()); + Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority); + } + } } if (SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) { descriptor.addCoprocessor(MultiRowMutationEndpoint.class.getName(), http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index b317d77..8c2e990 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -100,6 +100,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.schema.MetaDataSplitPolicy; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; @@ -238,7 +239,8 @@ public interface QueryConstants { HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" + HColumnDescriptor.KEEP_DELETED_CELLS + "=" + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n" + // Install split policy to prevent a tenant's metadata from being split across regions. - HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n"; + HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "',\n" + + PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE; public static final String CREATE_STATS_TABLE_METADATA = "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_STATS_TABLE + "\"(\n" + @@ -259,7 +261,8 @@ public interface QueryConstants { HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_STAT_DATA_VERSIONS + ",\n" + HColumnDescriptor.KEEP_DELETED_CELLS + "=" + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n" + // Install split policy to prevent a physical table's stats from being split across regions. - HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "'\n"; + HTableDescriptor.SPLIT_POLICY + "='" + MetaDataSplitPolicy.class.getName() + "',\n" + + PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE; public static final String CREATE_SEQUENCE_METADATA = "CREATE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + TYPE_SEQUENCE + "\"(\n" + @@ -277,5 +280,6 @@ public interface QueryConstants { LIMIT_REACHED_FLAG + " BOOLEAN \n" + " CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + "," + SEQUENCE_SCHEMA + "," + SEQUENCE_NAME + "))\n" + HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" + - HColumnDescriptor.KEEP_DELETED_CELLS + "=" + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + "\n"; + HColumnDescriptor.KEEP_DELETED_CELLS + "=" + MetaDataProtocol.DEFAULT_META_DATA_KEEP_DELETED_CELLS + ",\n" + + PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java index 265fc78..ee06179 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/PhoenixMetricsSink.java @@ -17,12 +17,23 @@ */ package org.apache.phoenix.trace; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Iterators; -import org.apache.commons.configuration.Configuration; +import static org.apache.phoenix.metrics.MetricInfo.ANNOTATION; +import static org.apache.phoenix.metrics.MetricInfo.DESCRIPTION; +import static org.apache.phoenix.metrics.MetricInfo.END; +import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME; +import static org.apache.phoenix.metrics.MetricInfo.PARENT; +import static org.apache.phoenix.metrics.MetricInfo.SPAN; +import static org.apache.phoenix.metrics.MetricInfo.START; +import static org.apache.phoenix.metrics.MetricInfo.TAG; +import static org.apache.phoenix.metrics.MetricInfo.TRACE; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + import org.apache.commons.configuration.SubsetConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,20 +42,16 @@ import org.apache.hadoop.metrics2.AbstractMetric; import org.apache.hadoop.metrics2.MetricsRecord; import org.apache.hadoop.metrics2.MetricsSink; import org.apache.hadoop.metrics2.MetricsTag; -import org.apache.phoenix.metrics.*; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.metrics.MetricInfo; +import org.apache.phoenix.metrics.Metrics; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.QueryUtil; -import javax.annotation.Nullable; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.*; - -import static org.apache.phoenix.metrics.MetricInfo.*; -import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; /** * Write the metrics to a phoenix table. @@ -169,7 +176,9 @@ public class PhoenixMetricsSink implements MetricsSink { TAG_COUNT + " smallint, " + ANNOTATION_COUNT + " smallint" + " CONSTRAINT pk PRIMARY KEY (" + TRACE.columnName + ", " - + PARENT.columnName + ", " + SPAN.columnName + "))\n"; + + PARENT.columnName + ", " + SPAN.columnName + "))\n" + + PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE; +; PreparedStatement stmt = conn.prepareStatement(ddl); stmt.execute(); this.table = table; http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 1fcf16a..45729a2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -24,8 +24,6 @@ import java.io.DataInputStream; import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -43,15 +41,12 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; -import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.IndexStatementRewriter; -import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.compile.WhereCompiler; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; @@ -65,16 +60,13 @@ import org.apache.phoenix.expression.visitor.RowKeyExpressionVisitor; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.index.IndexMaintainer; -import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.SQLParser; import org.apache.phoenix.parse.SelectStatement; -import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.ColumnNotFoundException; @@ -644,58 +636,4 @@ public class IndexUtil { return col.getExpressionStr() == null ? IndexUtil.getCaseSensitiveDataColumnFullName(col.getName().getString()) : col.getExpressionStr(); } - - /* - * The entire purpose of this method is to get the existing rows for the table rows being indexed into the - * block cache, as the index maintenance code does a point scan per row. Though for the transactional - * case we may be loading more than we need, since we're not applying the transaction filters, that - * should still be ok. - */ - public static void loadMutatingRowsIntoBlockCache(HRegion region, PhoenixIndexCodec codec, MiniBatchOperationInProgress<Mutation> miniBatchOp, boolean useRawScan) - throws IOException { - List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size()); - Map<ImmutableBytesWritable, IndexMaintainer> maintainers = - new HashMap<ImmutableBytesWritable, IndexMaintainer>(); - ImmutableBytesWritable indexTableName = new ImmutableBytesWritable(); - for (int i = 0; i < miniBatchOp.size(); i++) { - Mutation m = miniBatchOp.getOperation(i); - keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow())); - List<IndexMaintainer> indexMaintainers = codec.getIndexMetaData(m.getAttributesMap()).getIndexMaintainers(); - - for(IndexMaintainer indexMaintainer: indexMaintainers) { - if (indexMaintainer.isImmutableRows() && indexMaintainer.isLocalIndex()) continue; - indexTableName.set(indexMaintainer.getIndexTableName()); - if (maintainers.get(indexTableName) != null) continue; - maintainers.put(indexTableName, indexMaintainer); - } - - } - if (maintainers.isEmpty()) return; - Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values())); - scan.setRaw(useRawScan); - ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN); - scanRanges.initializeScan(scan); - scan.setFilter(scanRanges.getSkipScanFilter()); - RegionScanner scanner = region.getScanner(scan); - // Run through the scanner using internal nextRaw method - region.startRegionOperation(); - try { - synchronized (scanner) { - boolean hasMore; - do { - List<Cell> results = Lists.newArrayList(); - // Results are potentially returned even when the return value of s.next is - // false since this is an indication of whether or not there are more values - // after the ones returned - hasMore = scanner.nextRaw(results); - } while (hasMore); - } - } finally { - try { - scanner.close(); - } finally { - region.closeRegionOperation(); - } - } - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java index 7e73a81..0575b3f 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java @@ -16,7 +16,7 @@ import java.util.List; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.phoenix.index.BaseIndexCodec; +import org.apache.phoenix.hbase.index.builder.BaseIndexCodec; /** * An {@link IndexCodec} for testing that allow you to specify the index updates/deletes, regardless of the current @@ -41,12 +41,12 @@ public class CoveredIndexCodecForTesting extends BaseIndexCodec { } @Override - public Iterable<IndexUpdate> getIndexDeletes(TableState state) { + public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) { return this.deletes; } @Override - public Iterable<IndexUpdate> getIndexUpserts(TableState state) { + public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) { return this.updates; } @@ -59,7 +59,4 @@ public class CoveredIndexCodecForTesting extends BaseIndexCodec { public boolean isEnabled(Mutation m) { return true; } - - @Override - public void setContext(TableState state, Mutation mutation) throws IOException {} } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1521ab0/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java index f0c1483..e4654ea 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.covered.IndexCodec; import org.apache.phoenix.hbase.index.covered.IndexUpdate; import org.apache.phoenix.hbase.index.covered.LocalTableState; @@ -180,14 +181,14 @@ public class TestCoveredColumnIndexCodec { // check the codec for deletes it should send LocalTableState state = new LocalTableState(env, table, p); - Iterable<IndexUpdate> updates = codec.getIndexDeletes(state); + Iterable<IndexUpdate> updates = codec.getIndexDeletes(state, IndexMetaData.NULL_INDEX_META_DATA); assertFalse("Found index updates without any existing kvs in table!", updates.iterator().next() .isValid()); // get the updates with the pending update state.setCurrentTimestamp(1); state.addPendingUpdates(kvs); - updates = codec.getIndexUpserts(state); + updates = codec.getIndexUpserts(state, IndexMetaData.NULL_INDEX_META_DATA); assertTrue("Didn't find index updates for pending primary table update!", updates.iterator() .hasNext()); for (IndexUpdate update : updates) { @@ -210,7 +211,7 @@ public class TestCoveredColumnIndexCodec { state = new LocalTableState(env, table, d); state.setCurrentTimestamp(2); // check the cleanup of the current table, after the puts (mocking a 'next' update) - updates = codec.getIndexDeletes(state); + updates = codec.getIndexDeletes(state, IndexMetaData.NULL_INDEX_META_DATA); for (IndexUpdate update : updates) { assertTrue("Didn't have any index cleanup, even though there is current state", update.isValid()); @@ -240,7 +241,7 @@ public class TestCoveredColumnIndexCodec { state.setCurrentTimestamp(d.getTimeStamp()); // now we shouldn't see anything when getting the index update state.addPendingUpdates(d.getFamilyCellMap().get(FAMILY)); - Iterable<IndexUpdate> updates = codec.getIndexUpserts(state); + Iterable<IndexUpdate> updates = codec.getIndexUpserts(state, IndexMetaData.NULL_INDEX_META_DATA); for (IndexUpdate update : updates) { assertFalse("Had some index updates, though it should have been covered by the delete", update.isValid());
