http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 9f03747,0000000..5c7d32f mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@@ -1,445 -1,0 +1,447 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.apache.phoenix.index; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import co.cask.tephra.Transaction; +import co.cask.tephra.Transaction.VisibilityLevel; +import co.cask.tephra.TxConstants; +import co.cask.tephra.hbase98.TransactionAwareHTable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; ++import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.TimeRange; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Pair; ++import org.apache.htrace.Span; ++import org.apache.htrace.Trace; ++import org.apache.htrace.TraceScope; +import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.hbase.index.MultiMutation; +import org.apache.phoenix.hbase.index.ValueGetter; +import org.apache.phoenix.hbase.index.covered.IndexUpdate; +import org.apache.phoenix.hbase.index.covered.TableState; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.hbase.index.covered.update.ColumnTracker; +import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.hbase.index.write.IndexWriter; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PVarbinary; +import org.apache.phoenix.trace.TracingUtils; +import org.apache.phoenix.trace.util.NullSpan; +import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.ServerUtil; - import org.cloudera.htrace.Span; - import org.cloudera.htrace.Trace; - import org.cloudera.htrace.TraceScope; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; ++import com.google.inject.Key; + +/** + * Do all the work of managing index updates for a transactional table from a single coprocessor. Since the transaction + * manager essentially time orders writes through conflict detection, the logic to maintain a secondary index is quite a + * bit simpler than the non transactional case. For example, there's no need to muck with the WAL, as failure scenarios + * are handled by aborting the transaction. + */ +public class PhoenixTransactionalIndexer extends BaseRegionObserver { + + private static final Log LOG = LogFactory.getLog(PhoenixTransactionalIndexer.class); + + private PhoenixIndexCodec codec; + private IndexWriter writer; + private boolean stopped; + + @Override + public void start(CoprocessorEnvironment e) throws IOException { + final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment)e; + String serverName = env.getRegionServerServices().getServerName().getServerName(); + codec = new PhoenixIndexCodec(); + codec.initialize(env); + + // setup the actual index writer + this.writer = new IndexWriter(env, serverName + "-tx-index-writer"); + } + + @Override + public void stop(CoprocessorEnvironment e) throws IOException { + if (this.stopped) { return; } + this.stopped = true; + String msg = "TxIndexer is being stopped"; + this.writer.stop(msg); + } + + private static Iterator<Mutation> getMutationIterator(final MiniBatchOperationInProgress<Mutation> miniBatchOp) { + return new Iterator<Mutation>() { + private int i = 0; + + @Override + public boolean hasNext() { + return i < miniBatchOp.size(); + } + + @Override + public Mutation next() { + return miniBatchOp.getOperation(i++); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + }; + } + @Override + public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, + MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { + + Mutation m = miniBatchOp.getOperation(0); + if (!codec.isEnabled(m)) { + super.preBatchMutate(c, miniBatchOp); + return; + } + + Map<String,byte[]> updateAttributes = m.getAttributesMap(); + PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes); + byte[] txRollbackAttribute = m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY); + Collection<Pair<Mutation, byte[]>> indexUpdates = null; + // get the current span, or just use a null-span to avoid a bunch of if statements + try (TraceScope scope = Trace.startSpan("Starting to build index updates")) { + Span current = scope.getSpan(); + if (current == null) { + current = NullSpan.INSTANCE; + } + + // get the index updates for all elements in this batch + indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute); + + current.addTimelineAnnotation("Built index updates, doing preStep"); + TracingUtils.addAnnotation(current, "index update count", indexUpdates.size()); + + // no index updates, so we are done + if (!indexUpdates.isEmpty()) { + this.writer.write(indexUpdates); + } + } catch (Throwable t) { + String msg = "Failed to update index with entries:" + indexUpdates; + LOG.error(msg, t); + ServerUtil.throwIOException(msg, t); + } + } + + private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator, byte[] txRollbackAttribute) throws IOException { + ResultScanner currentScanner = null; + ResultScanner previousScanner = null; + TransactionAwareHTable txTable = null; + // Collect up all mutations in batch + Map<ImmutableBytesPtr, MultiMutation> mutations = + new HashMap<ImmutableBytesPtr, MultiMutation>(); + while(mutationIterator.hasNext()) { + Mutation m = mutationIterator.next(); + // add the mutation to the batch set + ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow()); + MultiMutation stored = mutations.get(row); + // we haven't seen this row before, so add it + if (stored == null) { + stored = new MultiMutation(row); + mutations.put(row, stored); + } + stored.addAll(m); + } + + // Collect the set of mutable ColumnReferences so that we can first + // run a scan to get the current state. We'll need this to delete + // the existing index rows. + Transaction tx = indexMetaData.getTransaction(); + assert(tx != null); + List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers(); + Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10); + for (IndexMaintainer indexMaintainer : indexMaintainers) { + // For transactional tables, we use an index maintainer + // to aid in rollback if there's a KeyValue column in the index. The alternative would be + // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the + // client side. + mutableColumns.addAll(indexMaintainer.getAllColumns()); + } + + Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size()); + try { + if (!mutableColumns.isEmpty()) { + List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size()); + for (ImmutableBytesPtr ptr : mutations.keySet()) { + keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary())); + } + Scan scan = new Scan(); + // Project all mutable columns + for (ColumnReference ref : mutableColumns) { + scan.addColumn(ref.getFamily(), ref.getQualifier()); + } + // Project empty key value column + scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); - ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN); ++ ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1); + scanRanges.initializeScan(scan); + scan.setFilter(scanRanges.getSkipScanFilter()); + TableName tableName = env.getRegion().getRegionInfo().getTable(); + HTableInterface htable = env.getTable(tableName); + txTable = new TransactionAwareHTable(htable); + txTable.startTx(tx); + currentScanner = txTable.getScanner(scan); + if (txRollbackAttribute!=null) { + tx.setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT); + previousScanner = txTable.getScanner(scan); + } + } + // In case of rollback we have to do two scans, one with VisibilityLevel.SNAPSHOT to see the current state of the row + // and another with VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT to see the previous state of the row + // so that we can rollback a previous delete + put + processScanner(env, indexMetaData, txRollbackAttribute, previousScanner, mutations, tx, mutableColumns, indexUpdates, false); + processScanner(env, indexMetaData, txRollbackAttribute, currentScanner, mutations, tx, mutableColumns, indexUpdates, true); + for (Mutation m : mutations.values()) { + long timestamp = getTimestamp(txRollbackAttribute, tx.getWritePointer(), m); + TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), timestamp, m); + // if we did not generate valid put, we might have to generate a delete + if (!generatePuts(indexMetaData, indexUpdates, state)) { + generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state); + } + } + } finally { + if (txTable != null) txTable.close(); + } + + return indexUpdates; + } + + private void processScanner(RegionCoprocessorEnvironment env, + PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute, + ResultScanner scanner, + Map<ImmutableBytesPtr, MultiMutation> mutations, Transaction tx, + Set<ColumnReference> mutableColumns, + Collection<Pair<Mutation, byte[]>> indexUpdates, boolean removeMutation) throws IOException { + if (scanner != null) { + Result result; + ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); + while ((result = scanner.next()) != null) { + Mutation m = removeMutation ? mutations.remove(new ImmutableBytesPtr(result.getRow())) : mutations.get(new ImmutableBytesPtr(result.getRow())); + long timestamp = getTimestamp(txRollbackAttribute, tx.getWritePointer(), m); + TxTableState state = new TxTableState(env, mutableColumns, indexMetaData.getAttributes(), timestamp, m, emptyColRef, result); + generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state); + generatePuts(indexMetaData, indexUpdates, state); + } + } + } + + private long getTimestamp(byte[] txRollbackAttribute, long txnWritePointer, Mutation m) { + if (txRollbackAttribute==null) { + return txnWritePointer; + } + // if this is a rollback generate mutations with the same timestamp as the data row mutation as the timestamp might be + // different from the current txn write pointer because of check points + long mutationTimestamp = txnWritePointer; + for (Entry<byte[], List<Cell>> entry : m.getFamilyCellMap().entrySet()) { + mutationTimestamp = entry.getValue().get(0).getTimestamp(); + break; + } + return mutationTimestamp; + } + + private void generateDeletes(PhoenixIndexMetaData indexMetaData, + Collection<Pair<Mutation, byte[]>> indexUpdates, + byte[] attribValue, TxTableState state) throws IOException { + Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData); + for (IndexUpdate delete : deletes) { + if (delete.isValid()) { + delete.getUpdate().setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue); + indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName())); + } + } + } + + boolean generatePuts( + PhoenixIndexMetaData indexMetaData, + Collection<Pair<Mutation, byte[]>> indexUpdates, + TxTableState state) + throws IOException { + state.applyMutation(); + Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData); + boolean validPut = false; + for (IndexUpdate put : puts) { + if (put.isValid()) { + indexUpdates.add(new Pair<Mutation, byte[]>(put.getUpdate(),put.getTableName())); + validPut = true; + } + } + return validPut; + } + + + private static class TxTableState implements TableState { + private final Mutation mutation; + private final long currentTimestamp; + private final RegionCoprocessorEnvironment env; + private final Map<String, byte[]> attributes; - private final List<Cell> pendingUpdates; ++ private final List<KeyValue> pendingUpdates; + private final Set<ColumnReference> indexedColumns; + private final Map<ColumnReference, ImmutableBytesWritable> valueMap; + + private TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation mutation) { + this.env = env; + this.currentTimestamp = currentTimestamp; + this.indexedColumns = indexedColumns; + this.attributes = attributes; + this.mutation = mutation; + int estimatedSize = indexedColumns.size(); + this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize); + this.pendingUpdates = Lists.newArrayListWithExpectedSize(estimatedSize); + try { + CellScanner scanner = mutation.cellScanner(); + while (scanner.advance()) { + Cell cell = scanner.current(); - pendingUpdates.add(cell); ++ pendingUpdates.add(KeyValueUtil.ensureKeyValue(cell)); + } + } catch (IOException e) { + throw new RuntimeException(e); // Impossible + } + } + + public TxTableState(RegionCoprocessorEnvironment env, Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long currentTimestamp, Mutation m, ColumnReference emptyColRef, Result r) { + this(env, indexedColumns, attributes, currentTimestamp, m); + + for (ColumnReference ref : indexedColumns) { + Cell cell = r.getColumnLatestCell(ref.getFamily(), ref.getQualifier()); + if (cell != null) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + valueMap.put(ref, ptr); + } + } + /* + Cell cell = r.getColumnLatestCell(emptyColRef.getFamily(), emptyColRef.getQualifier()); + if (cell != null) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + valueMap.put(emptyColRef, ptr); + } + */ + } + + @Override + public RegionCoprocessorEnvironment getEnvironment() { + return env; + } + + @Override + public long getCurrentTimestamp() { + return currentTimestamp; + } + + @Override + public Map<String, byte[]> getUpdateAttributes() { + return attributes; + } + + @Override + public byte[] getCurrentRowKey() { + return mutation.getRow(); + } + + @Override + public List<? extends IndexedColumnGroup> getIndexColumnHints() { + return Collections.emptyList(); + } + + public void applyMutation() { + /*if (mutation instanceof Delete) { + valueMap.clear(); + } else */ { + for (Cell cell : pendingUpdates) { + if (cell.getTypeByte() == KeyValue.Type.Delete.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) { + ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + valueMap.remove(ref); + } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) { + for (ColumnReference ref : indexedColumns) { + if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) { + valueMap.remove(ref); + } + } + } else if (cell.getTypeByte() == KeyValue.Type.Put.getCode()){ + ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + if (indexedColumns.contains(ref)) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + valueMap.put(ref, ptr); + } + } else { + throw new IllegalStateException("Unexpected mutation type for " + cell); + } + } + } + } + + @Override - public Collection<Cell> getPendingUpdate() { ++ public Collection<KeyValue> getPendingUpdate() { + return pendingUpdates; + } + + @Override + public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns) + throws IOException { + // TODO: creating these objects over and over again is wasteful + ColumnTracker tracker = new ColumnTracker(indexedColumns); + ValueGetter getter = new ValueGetter() { + + @Override + public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException { + return valueMap.get(ref); + } + + @Override + public byte[] getRowKey() { + return mutation.getRow(); + } + + }; + Pair<ValueGetter, IndexUpdate> pair = new Pair<ValueGetter, IndexUpdate>(getter, new IndexUpdate(tracker)); + return pair; + } + } +}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 4690ccc,57e84f0..c18233e --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@@ -49,9 -51,10 +51,11 @@@ import org.apache.phoenix.compile.Query import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.StatementContext; + import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; + import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.filter.ColumnProjectionFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.parse.FilterableStatement; @@@ -102,7 -104,7 +105,8 @@@ public abstract class BaseResultIterato private final byte[] physicalTableName; private final QueryPlan plan; protected final String scanId; + protected final MutationState mutationState; + private final ParallelScanGrouper scanGrouper; // TODO: too much nesting here - breakup into new classes. private final List<List<List<Pair<Scan,Future<PeekingResultIterator>>>>> allFutures; @@@ -133,61 -135,81 +137,84 @@@ return true; } - public BaseResultIterators(QueryPlan plan, Integer perScanLimit) throws SQLException { - super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint()); + private static void initializeScan(QueryPlan plan, Integer perScanLimit) { + StatementContext context = plan.getContext(); + TableRef tableRef = plan.getTableRef(); + PTable table = tableRef.getTable(); + Scan scan = context.getScan(); + + Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap(); + // Hack for PHOENIX-2067 to force raw scan over all KeyValues to fix their row keys + if (context.getConnection().isDescVarLengthRowKeyUpgrade()) { + // We project *all* KeyValues across all column families as we make a pass over + // a physical table and we want to make sure we catch all KeyValues that may be + // dynamic or part of an updatable view. + familyMap.clear(); + scan.setMaxVersions(); + scan.setFilter(null); // Remove any filter + scan.setRaw(true); // Traverse (and subsequently clone) all KeyValues + // Pass over PTable so we can re-write rows according to the row key schema + scan.setAttribute(BaseScannerRegionObserver.UPGRADE_DESC_ROW_KEY, UngroupedAggregateRegionObserver.serialize(table)); + } else { + FilterableStatement statement = plan.getStatement(); + RowProjector projector = plan.getProjector(); + boolean keyOnlyFilter = familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty(); + if (projector.isProjectEmptyKeyValue()) { + // If nothing projected into scan and we only have one column family, just allow everything + // to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to + // be quite a bit faster. + // Where condition columns also will get added into familyMap + // When where conditions are present, we can not add FirstKeyOnlyFilter at beginning. + if (familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty() + && table.getColumnFamilies().size() == 1) { + // Project the one column family. We must project a column family since it's possible + // that there are other non declared column families that we need to ignore. + scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes()); + } else { + byte[] ecf = SchemaUtil.getEmptyColumnFamily(table); + // Project empty key value unless the column family containing it has + // been projected in its entirety. + if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) { + scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES); + } + } + } else if (table.getViewType() == ViewType.MAPPED) { + // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the + // selected column values are returned back to client + for (PColumnFamily family : table.getColumnFamilies()) { + scan.addFamily(family.getName().getBytes()); + } + } + // Add FirstKeyOnlyFilter if there are no references to key value columns + if (keyOnlyFilter) { + ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter()); + } + + // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization. + if (perScanLimit != null) { + ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit)); + } + + doColumnProjectionOptimization(context, scan, table, statement); + } + } + + public BaseResultIterators(QueryPlan plan, Integer perScanLimit, ParallelScanGrouper scanGrouper) throws SQLException { + super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint(), plan.getLimit()); this.plan = plan; + this.scanGrouper = scanGrouper; StatementContext context = plan.getContext(); + // Clone MutationState as the one on the connection will change if auto commit is on + // yet we need the original one with the original transaction from TableResultIterator. + this.mutationState = new MutationState(context.getConnection().getMutationState()); TableRef tableRef = plan.getTableRef(); PTable table = tableRef.getTable(); - FilterableStatement statement = plan.getStatement(); - RowProjector projector = plan.getProjector(); physicalTableName = table.getPhysicalName().getBytes(); tableStats = useStats() ? new MetaDataClient(context.getConnection()).getTableStats(table) : PTableStats.EMPTY_STATS; - Scan scan = context.getScan(); // Used to tie all the scans together during logging scanId = UUID.randomUUID().toString(); - Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap(); - boolean keyOnlyFilter = familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty(); - if (projector.isProjectEmptyKeyValue()) { - // If nothing projected into scan and we only have one column family, just allow everything - // to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to - // be quite a bit faster. - // Where condition columns also will get added into familyMap - // When where conditions are present, we can not add FirstKeyOnlyFilter at beginning. - if (familyMap.isEmpty() && context.getWhereCoditionColumns().isEmpty() - && table.getColumnFamilies().size() == 1) { - // Project the one column family. We must project a column family since it's possible - // that there are other non declared column families that we need to ignore. - scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes()); - } else { - byte[] ecf = SchemaUtil.getEmptyColumnFamily(table); - // Project empty key value unless the column family containing it has - // been projected in its entirety. - if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) { - scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES); - } - } - } else if (table.getViewType() == ViewType.MAPPED) { - // Since we don't have the empty key value in MAPPED tables, we must select all CFs in HRS. But only the - // selected column values are returned back to client - for (PColumnFamily family : table.getColumnFamilies()) { - scan.addFamily(family.getName().getBytes()); - } - } - // Add FirstKeyOnlyFilter if there are no references to key value columns - if (keyOnlyFilter) { - ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter()); - } - // TODO adding all CFs here is not correct. It should be done only after ColumnProjectionOptimization. - if (perScanLimit != null) { - ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit)); - } - - doColumnProjectionOptimization(context, scan, table, statement); + initializeScan(plan, perScanLimit); this.scans = getParallelScans(); List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java index 488641f,f272e55..ef8c4a6 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java @@@ -26,7 -27,7 +27,8 @@@ import java.util.List import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; + import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.TableRef; @@@ -48,10 -49,10 +50,11 @@@ public class ChunkedResultIterator impl private final ParallelIteratorFactory delegateIteratorFactory; private ImmutableBytesWritable lastKey = new ImmutableBytesWritable(); + private final StatementContext context; private final TableRef tableRef; - private Scan scan; private final long chunkSize; + private final MutationState mutationState; + private Scan scan; private PeekingResultIterator resultIterator; public static class ChunkedResultIteratorFactory implements ParallelIteratorFactory { @@@ -70,19 -67,19 +73,19 @@@ } @Override - public PeekingResultIterator newIterator(ResultIterator scanner, Scan scan) throws SQLException { - scanner.close(); //close the iterator since we don't need it anymore. + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException { if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("ChunkedResultIteratorFactory.newIterator over " + tableRef.getTable().getName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan))); - return new ChunkedResultIterator(delegateFactory, mutationState, tableRef, scan, - return new ChunkedResultIterator(delegateFactory, context, tableRef, scan, - context.getConnection().getQueryServices().getProps().getLong( - QueryServices.SCAN_RESULT_CHUNK_SIZE, - QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE), scanner); ++ return new ChunkedResultIterator(delegateFactory, mutationState, context, tableRef, scan, + mutationState.getConnection().getQueryServices().getProps().getLong( + QueryServices.SCAN_RESULT_CHUNK_SIZE, - QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE)); ++ QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE), scanner); } } - private ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory, - StatementContext context, TableRef tableRef, Scan scan, long chunkSize, ResultIterator scanner) throws SQLException { + private ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory, MutationState mutationState, - TableRef tableRef, Scan scan, long chunkSize) throws SQLException { ++ StatementContext context, TableRef tableRef, Scan scan, long chunkSize, ResultIterator scanner) throws SQLException { this.delegateIteratorFactory = delegateIteratorFactory; + this.context = context; this.tableRef = tableRef; this.scan = scan; this.chunkSize = chunkSize; @@@ -122,9 -118,10 +125,10 @@@ scan = ScanUtil.newScan(scan); scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey)); if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get next chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan))); + String tableName = tableRef.getTable().getPhysicalName().getString(); ResultIterator singleChunkResultIterator = new SingleChunkResultIterator( - new TableResultIterator(mutationState, tableRef, scan), chunkSize); - resultIterator = delegateIteratorFactory.newIterator(singleChunkResultIterator, scan); - new TableResultIterator(context, tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName)), chunkSize); ++ new TableResultIterator(mutationState, tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName)), chunkSize); + resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName); } return resultIterator; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index ed81110,87f8335..265831c --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@@ -86,7 -102,7 +102,7 @@@ public class ParallelIterators extends @Override public PeekingResultIterator call() throws Exception { long startTime = System.currentTimeMillis(); - ResultIterator scanner = new TableResultIterator(mutationState, tableRef, scan); - ResultIterator scanner = new TableResultIterator(context, tableRef, scan, scanMetrics); ++ ResultIterator scanner = new TableResultIterator(mutationState, tableRef, scan, scanMetrics); if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan))); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java index 6edb98d,fa18c83..e0936b2 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java @@@ -79,12 -80,8 +80,8 @@@ public class SerialIterators extends Ba public PeekingResultIterator call() throws Exception { List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(scans.size()); for (final Scan scan : scans) { - long startTime = System.currentTimeMillis(); - ResultIterator scanner = new TableResultIterator(mutationState, tableRef, scan, ScannerCreation.DELAYED); - if (logger.isDebugEnabled()) { - logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan))); - } - concatIterators.add(iteratorFactory.newIterator(scanner, scan)); - ResultIterator scanner = new TableResultIterator(context, tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), ScannerCreation.DELAYED); ++ ResultIterator scanner = new TableResultIterator(mutationState, tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), ScannerCreation.DELAYED); + concatIterators.add(iteratorFactory.newIterator(context, scanner, scan, tableName)); } PeekingResultIterator concatIterator = ConcatResultIterator.newIterator(concatIterators); allIterators.add(concatIterator); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index e93d917,6f040d1..f1a2496 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@@ -22,9 -22,9 +22,12 @@@ import java.sql.SQLException import java.util.List; import org.apache.hadoop.hbase.client.HTableInterface; ++import org.apache.hadoop.hbase.client.Result; ++import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.schema.PTable; + import org.apache.phoenix.monitoring.CombinableMetric; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.Closeables; @@@ -45,9 -45,10 +48,10 @@@ public class TableResultIterator implem private final Scan scan; private final HTableInterface htable; private volatile ResultIterator delegate; - - public TableResultIterator(MutationState mutationState, Scan scan, TableRef tableRef) throws SQLException { - this(mutationState, tableRef, scan); + private final CombinableMetric scanMetrics; + - public TableResultIterator(StatementContext context, TableRef tableRef, CombinableMetric scanMetrics) throws SQLException { - this(context, tableRef, context.getScan(), scanMetrics); ++ public TableResultIterator(MutationState mutationState, Scan scan, TableRef tableRef, CombinableMetric scanMetrics) throws SQLException { ++ this(mutationState, tableRef, scan, scanMetrics); } /* @@@ -63,7 -64,7 +67,12 @@@ delegate = this.delegate; if (delegate == null) { try { - this.delegate = delegate = isClosing ? ResultIterator.EMPTY_ITERATOR : new ScanningResultIterator(htable.getScanner(scan)); ++ ResultScanner resultScanner = htable.getScanner(scan); ++ Result result = null; ++ while ( (result = resultScanner.next())!=null ) { ++ System.out.println(result); ++ } + this.delegate = delegate = isClosing ? ResultIterator.EMPTY_ITERATOR : new ScanningResultIterator(htable.getScanner(scan), scanMetrics); } catch (IOException e) { Closeables.closeQuietly(htable); throw ServerUtil.parseServerException(e); @@@ -74,14 -75,15 +83,15 @@@ return delegate; } - public TableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan) throws SQLException { - this(mutationState, tableRef, scan, ScannerCreation.IMMEDIATE); - public TableResultIterator(StatementContext context, TableRef tableRef, Scan scan, CombinableMetric scanMetrics) throws SQLException { - this(context, tableRef, scan, scanMetrics, ScannerCreation.IMMEDIATE); ++ public TableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics) throws SQLException { ++ this(mutationState, tableRef, scan, scanMetrics, ScannerCreation.IMMEDIATE); } - public TableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan, ScannerCreation creationMode) throws SQLException { - public TableResultIterator(StatementContext context, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, ScannerCreation creationMode) throws SQLException { - super(context, tableRef); ++ public TableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, ScannerCreation creationMode) throws SQLException { this.scan = scan; + this.scanMetrics = scanMetrics; - htable = context.getConnection().getQueryServices().getTable(tableRef.getTable().getPhysicalName().getBytes()); + PTable table = tableRef.getTable(); - this.htable = mutationState.getHTable(table); ++ htable = mutationState.getHTable(table); if (creationMode == ScannerCreation.IMMEDIATE) { getDelegate(false); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index e1c8dea,6985fc3..153a76e --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@@ -54,9 -53,10 +53,12 @@@ import java.util.concurrent.Executor import javax.annotation.Nullable; +import co.cask.tephra.Transaction; + import org.apache.hadoop.hbase.HConstants; + import org.apache.hadoop.hbase.client.Consistency; + import org.apache.htrace.Sampler; + import org.apache.htrace.TraceScope; import org.apache.phoenix.call.CallRunner; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; @@@ -154,28 -155,35 +161,39 @@@ public class PhoenixConnection implemen return props; } - public PhoenixConnection(PhoenixConnection connection) throws SQLException { - this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.metaData, connection.getMutationState()); + public PhoenixConnection(PhoenixConnection connection, boolean isDescRowKeyOrderUpgrade) throws SQLException { - this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache(), isDescRowKeyOrderUpgrade); ++ this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.metaData, connection.getMutationState(), isDescRowKeyOrderUpgrade); this.isAutoCommit = connection.isAutoCommit; this.sampler = connection.sampler; + this.statementExecutionCounter = connection.statementExecutionCounter; + } + + public PhoenixConnection(PhoenixConnection connection) throws SQLException { + this(connection, connection.isDescVarLengthRowKeyUpgrade); + } + + public PhoenixConnection(PhoenixConnection connection, long scn) throws SQLException { + this(connection.getQueryServices(), connection, scn); } public PhoenixConnection(ConnectionQueryServices services, PhoenixConnection connection, long scn) throws SQLException { - this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.metaData, connection.getMutationState()); - this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.getMetaDataCache(), connection.isDescVarLengthRowKeyUpgrade()); ++ this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.metaData, connection.getMutationState(), connection.isDescVarLengthRowKeyUpgrade()); this.isAutoCommit = connection.isAutoCommit; this.sampler = connection.sampler; + this.statementExecutionCounter = connection.statementExecutionCounter; } public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData) throws SQLException { - this(services, url, info, metaData, null); - this(services, url, info, metaData, false); ++ this(services, url, info, metaData, null, false); } - - public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, boolean isDescVarLengthRowKeyUpgrade) throws SQLException { + + public PhoenixConnection(PhoenixConnection connection, ConnectionQueryServices services, Properties info) throws SQLException { - this(services, connection.url, info, connection.metaData, null); ++ this(services, connection.url, info, connection.metaData, null, connection.isDescVarLengthRowKeyUpgrade()); + } + - public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, MutationState mutationState) throws SQLException { ++ public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, MutationState mutationState, boolean isDescVarLengthRowKeyUpgrade) throws SQLException { this.url = url; + this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade; // Copy so client cannot change this.info = info == null ? new Properties() : PropertiesUtil.deepCopy(info); final PName tenantId = JDBCUtil.getTenantId(url, info); @@@ -243,8 -245,17 +255,17 @@@ ! Objects.equal(tenantId, table.getTenantId())) ); } - }); - this.mutationState = mutationState == null ? new MutationState(maxSize, this) : new MutationState(mutationState); + @Override + public boolean prune(PFunction function) { + long maxTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; + return ( function.getTimeStamp() >= maxTimestamp || + ! Objects.equal(tenantId, function.getTenantId())); + } + }; + this.isRequestLevelMetricsEnabled = JDBCUtil.isCollectingRequestLevelMetricsEnabled(url, info, this.services.getProps()); - this.mutationState = newMutationState(maxSize); ++ this.mutationState = mutationState == null ? newMutationState(maxSize) : new MutationState(mutationState); + this.metaData = metaData.pruneTables(pruner); + this.metaData = metaData.pruneFunctions(pruner); this.services.addConnection(this); // setup tracing, if its enabled @@@ -372,14 -406,14 +416,22 @@@ return mutateBatchSize; } + public PMetaData getMetaDataCache() { + return metaData; + } ++ + public PTable getTable(PTableKey key) throws TableNotFoundException { + return metaData.getTableRef(key).getTable(); + } + + public PTableRef getTableRef(PTableKey key) throws TableNotFoundException { + return metaData.getTableRef(key); + } + protected MutationState newMutationState(int maxSize) { + return new MutationState(maxSize, this); + } + public MutationState getMutationState() { return mutationState; } @@@ -647,13 -687,8 +705,14 @@@ @Override public void rollback() throws SQLException { - mutationState.rollback(this); + CallRunner.run(new CallRunner.CallableThrowable<Void, SQLException>() { + @Override + public Void call() throws SQLException { + mutationState.rollback(); + return null; + } + }, Tracing.withTracing(this, "rolling back")); + statementExecutionCounter = 0; } @Override @@@ -758,29 -800,37 +824,41 @@@ // TODO Auto-generated method stub return 0; } - + @Override - public PMetaData addTable(PTable table) throws SQLException { - // TODO: since a connection is only used by one thread at a time, - // we could modify this metadata in place since it's not shared. - if (scn == null || scn > table.getTimeStamp()) { - metaData = metaData.addTable(table); - } + public PMetaData addTable(PTable table, long resolvedTime) throws SQLException { + metaData = metaData.addTable(table, resolvedTime); //Cascade through to connectionQueryServices too - getQueryServices().addTable(table); + getQueryServices().addTable(table, resolvedTime); + return metaData; + } + + @Override + public PMetaData updateResolvedTimestamp(PTable table, long resolvedTime) throws SQLException { + metaData = metaData.updateResolvedTimestamp(table, resolvedTime); + //Cascade through to connectionQueryServices too + getQueryServices().updateResolvedTimestamp(table, resolvedTime); return metaData; } + + @Override + public PMetaData addFunction(PFunction function) throws SQLException { + // TODO: since a connection is only used by one thread at a time, + // we could modify this metadata in place since it's not shared. + if (scn == null || scn > function.getTimeStamp()) { + metaData = metaData.addFunction(function); + } + //Cascade through to connectionQueryServices too + getQueryServices().addFunction(function); + return metaData; + } @Override - public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls) + public PMetaData addColumn(PName tenantId, String tableName, List<PColumn> columns, long tableTimeStamp, long tableSeqNum, boolean isImmutableRows, boolean isWalDisabled, boolean isMultitenant, boolean storeNulls, long resolvedTime) throws SQLException { - metaData = metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls); + metaData = metaData.addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, resolvedTime); //Cascade through to connectionQueryServices too - getQueryServices().addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls); + getQueryServices().addColumn(tenantId, tableName, columns, tableTimeStamp, tableSeqNum, isImmutableRows, isWalDisabled, isMultitenant, storeNulls, resolvedTime); return metaData; } @@@ -793,11 -843,19 +871,19 @@@ } @Override + public PMetaData removeFunction(PName tenantId, String functionName, long tableTimeStamp) throws SQLException { + metaData = metaData.removeFunction(tenantId, functionName, tableTimeStamp); + //Cascade through to connectionQueryServices too + getQueryServices().removeFunction(tenantId, functionName, tableTimeStamp); + return metaData; + } + + @Override public PMetaData removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp, - long tableSeqNum) throws SQLException { - metaData = metaData.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum); + long tableSeqNum, long resolvedTime) throws SQLException { + metaData = metaData.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime); //Cascade through to connectionQueryServices too - getQueryServices().removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum); + getQueryServices().removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime); return metaData; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 6376de7,19b34e6..55ae8b3 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@@ -33,16 -34,19 +34,22 @@@ import java.text.Format import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; + import java.util.Map; +import java.util.Set; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.call.CallRunner; +import org.apache.phoenix.compile.BaseMutationPlan; import org.apache.phoenix.compile.ColumnProjector; import org.apache.phoenix.compile.ColumnResolver; + import org.apache.phoenix.compile.CreateFunctionCompiler; import org.apache.phoenix.compile.CreateIndexCompiler; import org.apache.phoenix.compile.CreateSequenceCompiler; import org.apache.phoenix.compile.CreateTableCompiler; @@@ -236,14 -260,9 +263,14 @@@ public class PhoenixStatement implement public PhoenixResultSet call() throws SQLException { final long startTime = System.currentTimeMillis(); try { - QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE); - QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE); - plan = connection.getQueryServices().getOptimizer().optimize( - PhoenixStatement.this, plan); ++ QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE); + // Send mutations to hbase, so they are visible to subsequent reads. + // Use original plan for data table so that data and immutable indexes will be sent + // TODO: for joins, we need to iterate through all tables, but we need the original table, + // not the projected table, so plan.getContext().getResolver().getTables() won't work. + Iterator<TableRef> tableRefs = plan.getSourceRefs().iterator(); + connection.getMutationState().sendUncommitted(tableRefs); + plan = connection.getQueryServices().getOptimizer().optimize(PhoenixStatement.this, plan); // this will create its own trace internally, so we don't wrap this // whole thing in tracing ResultIterator resultIterator = plan.iterator(); @@@ -257,10 -278,7 +286,11 @@@ setLastResultSet(rs); setLastUpdateCount(NO_UPDATE); setLastUpdateOperation(stmt.getOperation()); + // If transactional, this will move the read pointer forward + if (connection.getAutoCommit()) { + connection.commit(); + } + connection.incrementStatementExecutionCounter(); return rs; } catch (RuntimeException e) { // FIXME: Expression.evaluate does not throw SQLException @@@ -296,17 -314,13 +326,17 @@@ new CallRunner.CallableThrowable<Integer, SQLException>() { @Override public Integer call() throws SQLException { - // Note that the upsert select statements will need to commit any open transaction here, - // since they'd update data directly from coprocessors, and should thus operate on - // the latest state try { + MutationState state = connection.getMutationState(); - MutationPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE); + MutationPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE); - MutationState state = plan.execute(); - connection.getMutationState().join(state); + if (plan.getTargetRef() != null && plan.getTargetRef().getTable() != null && plan.getTargetRef().getTable().isTransactional()) { + state.startTransaction(); + } + Iterator<TableRef> tableRefs = plan.getSourceRefs().iterator(); + state.sendUncommitted(tableRefs); + state.checkpoint(plan); + MutationState lastState = plan.execute(); + state.join(lastState); if (connection.getAutoCommit()) { connection.commit(); } @@@ -314,9 -328,10 +344,10 @@@ setLastQueryPlan(null); // Unfortunately, JDBC uses an int for update count, so we // just max out at Integer.MAX_VALUE - int lastUpdateCount = (int) Math.min(Integer.MAX_VALUE, state.getUpdateCount()); + int lastUpdateCount = (int) Math.min(Integer.MAX_VALUE, lastState.getUpdateCount()); setLastUpdateCount(lastUpdateCount); setLastUpdateOperation(stmt.getOperation()); + connection.incrementStatementExecutionCounter(); return lastUpdateCount; } catch (RuntimeException e) { // FIXME: Expression.evaluate does not throw SQLException @@@ -504,10 -529,10 +550,14 @@@ return true; } + @Override + public Operation getOperation() { + return this.getOperation(); + } + @Override + public boolean useRoundRobinIterator() throws SQLException { + return false; + } }; } @@@ -521,7 -546,10 +571,10 @@@ @SuppressWarnings("unchecked") @Override public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { - UpsertCompiler compiler = new UpsertCompiler(stmt, this.getOperation()); + if(!getUdfParseNodes().isEmpty()) { + stmt.throwIfUnallowedUserDefinedFunctions(getUdfParseNodes()); + } - UpsertCompiler compiler = new UpsertCompiler(stmt); ++ UpsertCompiler compiler = new UpsertCompiler(stmt, this.getOperation()); MutationPlan plan = compiler.compile(this); plan.getContext().getSequenceManager().validateSequences(seqAction); return plan; @@@ -536,7 -564,10 +589,10 @@@ @SuppressWarnings("unchecked") @Override public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { - DeleteCompiler compiler = new DeleteCompiler(stmt, this.getOperation()); + if(!getUdfParseNodes().isEmpty()) { + stmt.throwIfUnallowedUserDefinedFunctions(getUdfParseNodes()); + } - DeleteCompiler compiler = new DeleteCompiler(stmt); ++ DeleteCompiler compiler = new DeleteCompiler(stmt, this.getOperation()); MutationPlan plan = compiler.compile(this); plan.getContext().getSequenceManager().validateSequences(seqAction); return plan; @@@ -558,6 -589,209 +614,179 @@@ } } + private static class ExecutableCreateFunctionStatement extends CreateFunctionStatement implements CompilableStatement { + + public ExecutableCreateFunctionStatement(PFunction functionInfo, boolean temporary, boolean isReplace) { + super(functionInfo, temporary, isReplace); + } + + + @SuppressWarnings("unchecked") + @Override + public MutationPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { + stmt.throwIfUnallowedUserDefinedFunctions(Collections.EMPTY_MAP); + CreateFunctionCompiler compiler = new CreateFunctionCompiler(stmt); + return compiler.compile(this); + } + } + + private static class ExecutableDropFunctionStatement extends DropFunctionStatement implements CompilableStatement { + + public ExecutableDropFunctionStatement(String functionName, boolean ifNotExists) { + super(functionName, ifNotExists); + } + + @SuppressWarnings("unchecked") + @Override + public MutationPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { + final StatementContext context = new StatementContext(stmt); - return new MutationPlan() { - - @Override - public StatementContext getContext() { - return context; - } ++ return new BaseMutationPlan(context, this.getOperation()) { + + @Override + public ParameterMetaData getParameterMetaData() { + return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA; + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { - return new ExplainPlan(Collections.singletonList("DROP TABLE")); - } - - @Override - public PhoenixConnection getConnection() { - return stmt.getConnection(); ++ return new ExplainPlan(Collections.singletonList("DROP FUNCTION")); + } + + @Override + public MutationState execute() throws SQLException { - MetaDataClient client = new MetaDataClient(getConnection()); ++ MetaDataClient client = new MetaDataClient(getContext().getConnection()); + return client.dropFunction(ExecutableDropFunctionStatement.this); + } + }; + } + } + + private static class ExecutableAddJarsStatement extends AddJarsStatement implements CompilableStatement { + + public ExecutableAddJarsStatement(List<LiteralParseNode> jarPaths) { + super(jarPaths); + } + + + @SuppressWarnings("unchecked") + @Override + public MutationPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { + final StatementContext context = new StatementContext(stmt); - return new MutationPlan() { - - @Override - public StatementContext getContext() { - return context; - } ++ return new BaseMutationPlan(context, this.getOperation()) { + + @Override + public ParameterMetaData getParameterMetaData() { + return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA; + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return new ExplainPlan(Collections.singletonList("ADD JARS")); + } + + @Override - public PhoenixConnection getConnection() { - return stmt.getConnection(); - } - - @Override + public MutationState execute() throws SQLException { + String dynamicJarsDir = stmt.getConnection().getQueryServices().getProps().get(QueryServices.DYNAMIC_JARS_DIR_KEY); + if(dynamicJarsDir == null) { + throw new SQLException(QueryServices.DYNAMIC_JARS_DIR_KEY+" is not configured for placing the jars."); + } + dynamicJarsDir = + dynamicJarsDir.endsWith("/") ? dynamicJarsDir : dynamicJarsDir + '/'; + Configuration conf = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); + Path dynamicJarsDirPath = new Path(dynamicJarsDir); + for (LiteralParseNode jarPath : getJarPaths()) { + String jarPathStr = (String)jarPath.getValue(); + if(!jarPathStr.endsWith(".jar")) { + throw new SQLException(jarPathStr + " is not a valid jar file path."); + } + } + + try { + FileSystem fs = dynamicJarsDirPath.getFileSystem(conf); + List<LiteralParseNode> jarPaths = getJarPaths(); + for (LiteralParseNode jarPath : jarPaths) { + File f = new File((String) jarPath.getValue()); + fs.copyFromLocalFile(new Path(f.getAbsolutePath()), new Path( + dynamicJarsDir + f.getName())); + } + } catch(IOException e) { + throw new SQLException(e); + } + return new MutationState(0, context.getConnection()); + } + }; + + } + } + + private static class ExecutableDeleteJarStatement extends DeleteJarStatement implements CompilableStatement { + + public ExecutableDeleteJarStatement(LiteralParseNode jarPath) { + super(jarPath); + } + + + @SuppressWarnings("unchecked") + @Override + public MutationPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { + final StatementContext context = new StatementContext(stmt); - return new MutationPlan() { - - @Override - public StatementContext getContext() { - return context; - } ++ return new BaseMutationPlan(context, this.getOperation()) { + + @Override + public ParameterMetaData getParameterMetaData() { + return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA; + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return new ExplainPlan(Collections.singletonList("DELETE JAR")); + } + + @Override - public PhoenixConnection getConnection() { - return stmt.getConnection(); - } - - @Override + public MutationState execute() throws SQLException { + String dynamicJarsDir = stmt.getConnection().getQueryServices().getProps().get(QueryServices.DYNAMIC_JARS_DIR_KEY); + if (dynamicJarsDir == null) { + throw new SQLException(QueryServices.DYNAMIC_JARS_DIR_KEY + + " is not configured."); + } + dynamicJarsDir = + dynamicJarsDir.endsWith("/") ? dynamicJarsDir : dynamicJarsDir + '/'; + Configuration conf = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); + Path dynamicJarsDirPath = new Path(dynamicJarsDir); + try { + FileSystem fs = dynamicJarsDirPath.getFileSystem(conf); + String jarPathStr = (String)getJarPath().getValue(); + if(!jarPathStr.endsWith(".jar")) { + throw new SQLException(jarPathStr + " is not a valid jar file path."); + } + Path p = new Path(jarPathStr); + if(fs.exists(p)) { + fs.delete(p, false); + } + } catch(IOException e) { + throw new SQLException(e); + } + return new MutationState(0, context.getConnection()); + } + }; + + } + } + + private static class ExecutableListJarsStatement extends ListJarsStatement implements CompilableStatement { + + public ExecutableListJarsStatement() { + super(); + } + + + @SuppressWarnings("unchecked") + @Override + public QueryPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { + return new ListJarsQueryPlan(stmt); + } + } + private static class ExecutableCreateIndexStatement extends CreateIndexStatement implements CompilableStatement { public ExecutableCreateIndexStatement(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits, @@@ -568,7 -802,10 +797,10 @@@ @SuppressWarnings("unchecked") @Override public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { - CreateIndexCompiler compiler = new CreateIndexCompiler(stmt, this.getOperation()); + if(!getUdfParseNodes().isEmpty()) { + stmt.throwIfUnallowedUserDefinedFunctions(getUdfParseNodes()); + } - CreateIndexCompiler compiler = new CreateIndexCompiler(stmt); ++ CreateIndexCompiler compiler = new CreateIndexCompiler(stmt, this.getOperation()); return compiler.compile(this); } } @@@ -695,10 -978,58 +927,54 @@@ } } + private static class ExecutableAlterSessionStatement extends AlterSessionStatement implements CompilableStatement { + + public ExecutableAlterSessionStatement(Map<String,Object> props) { + super(props); + } + + @SuppressWarnings("unchecked") + @Override + public MutationPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException { + final StatementContext context = new StatementContext(stmt); - return new MutationPlan() { ++ return new BaseMutationPlan(context, this.getOperation()) { + + @Override + public StatementContext getContext() { + return context; + } + + @Override + public ParameterMetaData getParameterMetaData() { + return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA; + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return new ExplainPlan(Collections.singletonList("ALTER SESSION")); + } + - @Override - public PhoenixConnection getConnection() { - return stmt.getConnection(); - } + + @Override + public MutationState execute() throws SQLException { + Object consistency = getProps().get(PhoenixRuntime.CONSISTENCY_ATTRIB.toUpperCase()); + if(consistency != null) { + if (((String)consistency).equalsIgnoreCase(Consistency.TIMELINE.toString())){ - getConnection().setConsistency(Consistency.TIMELINE); ++ getContext().getConnection().setConsistency(Consistency.TIMELINE); + } else { - getConnection().setConsistency(Consistency.STRONG); ++ getContext().getConnection().setConsistency(Consistency.STRONG); + } + } + return new MutationState(0, context.getConnection()); + } + }; + } + } + private static class ExecutableUpdateStatisticsStatement extends UpdateStatisticsStatement implements CompilableStatement { - public ExecutableUpdateStatisticsStatement(NamedTableNode table, StatisticsCollectionScope scope) { - super(table, scope); + public ExecutableUpdateStatisticsStatement(NamedTableNode table, StatisticsCollectionScope scope, Map<String,Object> props) { + super(table, scope, props); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java index 5500ee8,b500a25..d2f91b3 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java @@@ -99,8 -104,12 +104,12 @@@ public class PhoenixRecordReader<T exte final List<Scan> scans = pSplit.getScans(); try { List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size()); + StatementContext ctx = queryPlan.getContext(); + ReadMetricQueue readMetrics = ctx.getReadMetricsQueue(); + String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString(); for (Scan scan : scans) { - final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext().getConnection().getMutationState(), queryPlan.getTableRef(),scan); - final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), ++ final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext().getConnection().getMutationState(), + queryPlan.getTableRef(), scan, readMetrics.allotMetric(SCAN_BYTES, tableName)); PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator); iterators.add(peekingResultIterator); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java index 0000000,32b66f1..3bc3808 mode 000000,100644..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java @@@ -1,0 -1,182 +1,182 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.phoenix.mapreduce.index; + + import java.io.IOException; + import java.sql.Connection; + import java.sql.PreparedStatement; + import java.sql.SQLException; + import java.util.Iterator; + import java.util.List; + import java.util.Properties; + import java.util.UUID; + + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.hbase.client.Mutation; + import org.apache.hadoop.hbase.io.ImmutableBytesWritable; + import org.apache.hadoop.hbase.util.Pair; + import org.apache.hadoop.io.IntWritable; + import org.apache.hadoop.io.NullWritable; + import org.apache.hadoop.mapreduce.Mapper; + import org.apache.phoenix.execute.MutationState; + import org.apache.phoenix.jdbc.PhoenixConnection; + import org.apache.phoenix.mapreduce.PhoenixJobCounters; + import org.apache.phoenix.mapreduce.util.ConnectionUtil; + import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; + import org.apache.phoenix.query.ConnectionQueryServices; + import org.apache.phoenix.query.QueryServices; + import org.apache.phoenix.query.QueryServicesOptions; + import org.apache.phoenix.util.ColumnInfo; + import org.apache.phoenix.util.PhoenixRuntime; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + /** + * Mapper that hands over rows from data table to the index table. + */ + public class PhoenixIndexImportDirectMapper extends + Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, IntWritable> { + + private static final Logger LOG = LoggerFactory.getLogger(PhoenixIndexImportDirectMapper.class); + + private final PhoenixIndexDBWritable indxWritable = new PhoenixIndexDBWritable(); + + private List<ColumnInfo> indxTblColumnMetadata; + + private Connection connection; + + private PreparedStatement pStatement; + + private DirectHTableWriter writer; + + private int batchSize; + + private MutationState mutationState; + + @Override + protected void setup(final Context context) throws IOException, InterruptedException { + super.setup(context); + final Configuration configuration = context.getConfiguration(); + writer = new DirectHTableWriter(configuration); + + try { + indxTblColumnMetadata = + PhoenixConfigurationUtil.getUpsertColumnMetadataList(configuration); + indxWritable.setColumnMetadata(indxTblColumnMetadata); + + final Properties overrideProps = new Properties(); + overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, + configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE)); + connection = ConnectionUtil.getOutputConnection(configuration, overrideProps); + connection.setAutoCommit(false); + // Get BatchSize + ConnectionQueryServices services = ((PhoenixConnection) connection).getQueryServices(); + int maxSize = + services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB, + QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); + batchSize = Math.min(((PhoenixConnection) connection).getMutateBatchSize(), maxSize); + LOG.info("Mutation Batch Size = " + batchSize); + + final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration); + this.pStatement = connection.prepareStatement(upsertQuery); + + } catch (SQLException e) { + throw new RuntimeException(e.getMessage()); + } + } + + @Override + protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context) + throws IOException, InterruptedException { + + context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1); + + try { + final List<Object> values = record.getValues(); + indxWritable.setValues(values); + indxWritable.write(this.pStatement); + this.pStatement.execute(); + + final PhoenixConnection pconn = connection.unwrap(PhoenixConnection.class); + MutationState currentMutationState = pconn.getMutationState(); + if (mutationState == null) { + mutationState = currentMutationState; + return; + } + // Keep accumulating Mutations till batch size + mutationState.join(currentMutationState); + + // Write Mutation Batch + if (context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize == 0) { + writeBatch(mutationState, context); + mutationState = null; + } + + // Make sure progress is reported to Application Master. + context.progress(); + } catch (SQLException e) { + LOG.error(" Error {} while read/write of a record ", e.getMessage()); + context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1); + throw new RuntimeException(e); + } + } + + private void writeBatch(MutationState mutationState, Context context) throws IOException, + SQLException, InterruptedException { - final Iterator<Pair<byte[], List<Mutation>>> iterator = mutationState.toMutations(true); ++ final Iterator<Pair<byte[], List<Mutation>>> iterator = mutationState.toMutations(true, null); + while (iterator.hasNext()) { + Pair<byte[], List<Mutation>> mutationPair = iterator.next(); + + writer.write(mutationPair.getSecond()); + context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment( + mutationPair.getSecond().size()); + } + connection.rollback(); + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + try { + // Write the last & final Mutation Batch + if (mutationState != null) { + writeBatch(mutationState, context); + } + // We are writing some dummy key-value as map output here so that we commit only one + // output to reducer. + context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()), + new IntWritable(0)); + super.cleanup(context); + } catch (SQLException e) { + LOG.error(" Error {} while read/write of a record ", e.getMessage()); + context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1); + throw new RuntimeException(e); + } finally { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ", + e.getMessage()); + } + } + if (writer != null) { + writer.close(); + } + } + } + }
