IMPALA-3346: DeepCopy() Kudu rows into Impala tuples. Implements additional changes to make the memory layout of Kudu rows identical to Impala tuples. In particular, Kudu rows allocate a null bit even for non-nullable columns, and Impala now does the same for Kudu scan tuples.
This change exploits the now-identical Kudu and Impala tuple layouts to avoid the expensive translation. Perf: Mostafa reported a 50% efficiency gain on full table scans. Testing: A private core/hdfs run passed. TODO: 1) Test cases with nullable/nonnullable non-PK slots. 2) Specify mem layout to client (depends on KUDU-1694) 3) Avoid mem copies (depends on KUDU-1695) Change-Id: Ic911e4eff9fe98bf28d8a1bab5c9d7e9ab66d9cb Reviewed-on: http://gerrit.cloudera.org:8080/4862 Reviewed-by: Dan Hecht <[email protected]> Tested-by: Marcel Kornacker <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9f3f4b71 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9f3f4b71 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9f3f4b71 Branch: refs/heads/master Commit: 9f3f4b713d78e8dcf02c02def447195a04f408e6 Parents: 12e34b4 Author: Alex Behm <[email protected]> Authored: Mon Oct 24 20:46:34 2016 -0700 Committer: Marcel Kornacker <[email protected]> Committed: Sun Oct 30 19:36:10 2016 +0000 ---------------------------------------------------------------------- be/src/exec/kudu-scanner.cc | 183 +++---------------- be/src/exec/kudu-scanner.h | 28 --- .../apache/impala/analysis/SlotDescriptor.java | 4 +- .../apache/impala/analysis/TupleDescriptor.java | 41 +++-- .../org/apache/impala/planner/KuduScanNode.java | 13 +- 5 files changed, 67 insertions(+), 202 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f3f4b71/be/src/exec/kudu-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc index ca4ee9a..d230985 100644 --- a/be/src/exec/kudu-scanner.cc +++ b/be/src/exec/kudu-scanner.cc @@ -58,19 +58,10 @@ KuduScanner::KuduScanner(KuduScanNode* scan_node, RuntimeState* state) : scan_node_(scan_node), state_(state), cur_kudu_batch_num_read_(0), - last_alive_time_micros_(0), - num_string_slots_(0) { + last_alive_time_micros_(0) { } Status KuduScanner::Open() { - // Store columns that need relocation when materialized into the - // destination row batch. - for (int i = 0; i < scan_node_->tuple_desc_->slots().size(); ++i) { - if (scan_node_->tuple_desc_->slots()[i]->type().IsStringType()) { - string_slots_.push_back(scan_node_->tuple_desc_->slots()[i]); - ++num_string_slots_; - } - } return scan_node_->GetConjunctCtxs(&conjunct_ctxs_); } @@ -109,7 +100,7 @@ Status KuduScanner::GetNext(RowBatch* row_batch, bool* eos) { RETURN_IF_CANCELLED(state_); if (cur_kudu_batch_num_read_ < cur_kudu_batch_.NumRows()) { - bool batch_done = false; + bool batch_done; RETURN_IF_ERROR(DecodeRowsIntoRowBatch(row_batch, &tuple, &batch_done)); if (batch_done) break; } @@ -172,54 +163,43 @@ Status KuduScanner::HandleEmptyProjection(RowBatch* row_batch, bool* batch_done) return Status::OK(); } -Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, - Tuple** tuple_mem, bool* batch_done) { +Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_mem, + bool* batch_done) { + *batch_done = false; // Short-circuit the count(*) case. if (scan_node_->tuple_desc_->slots().empty()) { return HandleEmptyProjection(row_batch, batch_done); } - // TODO consider consolidating the tuple creation/initialization here with the version - // that happens inside the loop. - int idx = row_batch->AddRow(); - TupleRow* row = row_batch->GetRow(idx); - (*tuple_mem)->ClearNullBits(*scan_node_->tuple_desc()); - row->SetTuple(tuple_idx(), *tuple_mem); - + // Iterate through the Kudu rows, evaluate conjuncts and deep-copy survivors into + // 'row_batch'. + bool has_conjuncts = !conjunct_ctxs_.empty(); int num_rows = cur_kudu_batch_.NumRows(); - // Now iterate through the Kudu rows. for (int krow_idx = cur_kudu_batch_num_read_; krow_idx < num_rows; ++krow_idx) { - // Clear any NULL indicators set by a previous iteration. - (*tuple_mem)->ClearNullBits(*scan_node_->tuple_desc()); - - // Transform a Kudu row into an Impala row. + // Evaluate the conjuncts that haven't been pushed down to Kudu. Conjunct evaluation + // is performed directly on the Kudu tuple because its memory layout is identical to + // Impala's. We only copy the surviving tuples to Impala's output row batch. KuduScanBatch::RowPtr krow = cur_kudu_batch_.Row(krow_idx); - RETURN_IF_ERROR(KuduRowToImpalaTuple(krow, row_batch, *tuple_mem)); + Tuple* kudu_tuple = reinterpret_cast<Tuple*>(const_cast<void*>(krow.cell(0))); ++cur_kudu_batch_num_read_; - - // Evaluate the conjuncts that haven't been pushed down to Kudu. - if (conjunct_ctxs_.empty() || - ExecNode::EvalConjuncts(&conjunct_ctxs_[0], conjunct_ctxs_.size(), row)) { - // Materialize those slots that require auxiliary memory - RETURN_IF_ERROR(RelocateValuesFromKudu(*tuple_mem, row_batch->tuple_data_pool())); - // If the conjuncts pass on the row commit it. - row_batch->CommitLastRow(); - // If we've reached the capacity, or the LIMIT for the scan, return. - if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) { - *batch_done = true; - break; - } - // Add another row. - idx = row_batch->AddRow(); - - // Move to the next tuple in the tuple buffer. - *tuple_mem = next_tuple(*tuple_mem); - (*tuple_mem)->ClearNullBits(*scan_node_->tuple_desc()); - // Make 'row' point to the new row. - row = row_batch->GetRow(idx); - row->SetTuple(tuple_idx(), *tuple_mem); + if (has_conjuncts && !ExecNode::EvalConjuncts(&conjunct_ctxs_[0], + conjunct_ctxs_.size(), reinterpret_cast<TupleRow*>(&kudu_tuple))) { + continue; + } + // Deep copy the tuple, set it in a new row, and commit the row. + kudu_tuple->DeepCopy(*tuple_mem, *scan_node_->tuple_desc(), + row_batch->tuple_data_pool()); + TupleRow* row = row_batch->GetRow(row_batch->AddRow()); + row->SetTuple(0, *tuple_mem); + row_batch->CommitLastRow(); + // If we've reached the capacity, or the LIMIT for the scan, return. + if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) { + *batch_done = true; + break; } + // Move to the next tuple in the tuple buffer. + *tuple_mem = next_tuple(*tuple_mem); } ExprContext::FreeLocalAllocations(conjunct_ctxs_); @@ -227,113 +207,6 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, return state_->GetQueryStatus(); } -void KuduScanner::SetSlotToNull(Tuple* tuple, const SlotDescriptor& slot) { - DCHECK(slot.is_nullable()); - tuple->SetNull(slot.null_indicator_offset()); -} - -bool KuduScanner::IsSlotNull(Tuple* tuple, const SlotDescriptor& slot) { - return slot.is_nullable() && tuple->IsNull(slot.null_indicator_offset()); -} - -Status KuduScanner::RelocateValuesFromKudu(Tuple* tuple, MemPool* mem_pool) { - for (int i = 0; i < num_string_slots_; ++i) { - const SlotDescriptor* slot = string_slots_[i]; - // NULL handling was done in KuduRowToImpalaTuple. - if (IsSlotNull(tuple, *slot)) continue; - - // Extract the string value. - void* slot_ptr = tuple->GetSlot(slot->tuple_offset()); - DCHECK(slot->type().IsVarLenStringType()); - - // The string value of the slot has a pointer to memory from the Kudu row. - StringValue* val = reinterpret_cast<StringValue*>(slot_ptr); - char* old_buf = val->ptr; - // Kudu never returns values larger than 8MB - DCHECK_LE(val->len, 8 * (1 << 20)); - val->ptr = reinterpret_cast<char*>(mem_pool->TryAllocate(val->len)); - if (LIKELY(val->len > 0)) { - // The allocator returns a NULL ptr when out of memory. - if (UNLIKELY(val->ptr == NULL)) { - return mem_pool->mem_tracker()->MemLimitExceeded(state_, - "Kudu scanner could not allocate memory for string", val->len); - } - memcpy(val->ptr, old_buf, val->len); - } - } - return Status::OK(); -} - - -Status KuduScanner::KuduRowToImpalaTuple(const KuduScanBatch::RowPtr& row, - RowBatch* row_batch, Tuple* tuple) { - for (int i = 0; i < scan_node_->tuple_desc_->slots().size(); ++i) { - const SlotDescriptor* info = scan_node_->tuple_desc_->slots()[i]; - void* slot = tuple->GetSlot(info->tuple_offset()); - - if (row.IsNull(i)) { - SetSlotToNull(tuple, *info); - continue; - } - - int max_len = -1; - switch (info->type().type) { - case TYPE_VARCHAR: - max_len = info->type().len; - DCHECK_GT(max_len, 0); - // Fallthrough intended. - case TYPE_STRING: { - // For types with auxiliary memory (String, Binary,...) store the original memory - // location in the tuple to avoid the copy when the conjuncts do not pass. Relocate - // the memory into the row batch's memory in a later step. - kudu::Slice slice; - KUDU_RETURN_IF_ERROR(row.GetString(i, &slice), - "Error getting column value from Kudu."); - StringValue* sv = reinterpret_cast<StringValue*>(slot); - sv->ptr = const_cast<char*>(reinterpret_cast<const char*>(slice.data())); - sv->len = static_cast<int>(slice.size()); - if (max_len > 0) sv->len = std::min(sv->len, max_len); - break; - } - case TYPE_TINYINT: - KUDU_RETURN_IF_ERROR(row.GetInt8(i, reinterpret_cast<int8_t*>(slot)), - "Error getting column value from Kudu."); - break; - case TYPE_SMALLINT: - KUDU_RETURN_IF_ERROR(row.GetInt16(i, reinterpret_cast<int16_t*>(slot)), - "Error getting column value from Kudu."); - break; - case TYPE_INT: - KUDU_RETURN_IF_ERROR(row.GetInt32(i, reinterpret_cast<int32_t*>(slot)), - "Error getting column value from Kudu."); - break; - case TYPE_BIGINT: - KUDU_RETURN_IF_ERROR(row.GetInt64(i, reinterpret_cast<int64_t*>(slot)), - "Error getting column value from Kudu."); - break; - case TYPE_FLOAT: - KUDU_RETURN_IF_ERROR(row.GetFloat(i, reinterpret_cast<float*>(slot)), - "Error getting column value from Kudu."); - break; - case TYPE_DOUBLE: - KUDU_RETURN_IF_ERROR(row.GetDouble(i, reinterpret_cast<double*>(slot)), - "Error getting column value from Kudu."); - break; - case TYPE_BOOLEAN: - KUDU_RETURN_IF_ERROR(row.GetBool(i, reinterpret_cast<bool*>(slot)), - "Error getting column value from Kudu."); - break; - default: - DCHECK(false) << "Impala type unsupported in Kudu: " - << TypeToString(info->type().type); - return Status(TErrorCode::IMPALA_KUDU_TYPE_MISSING, - TypeToString(info->type().type)); - } - } - return Status::OK(); -} - - Status KuduScanner::GetNextScannerBatch() { SCOPED_TIMER(state_->total_storage_wait_timer()); int64_t now = MonotonicMicros(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f3f4b71/be/src/exec/kudu-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scanner.h b/be/src/exec/kudu-scanner.h index 0ed5221..bf84b08 100644 --- a/be/src/exec/kudu-scanner.h +++ b/be/src/exec/kudu-scanner.h @@ -63,12 +63,6 @@ class KuduScanner { /// Does this by adding sets of rows to 'row_batch' instead of adding one-by-one. Status HandleEmptyProjection(RowBatch* row_batch, bool* batch_done); - /// Set 'slot' to Null in 'tuple'. - void SetSlotToNull(Tuple* tuple, const SlotDescriptor& slot); - - /// Returns true if 'slot' is Null in 'tuple'. - bool IsSlotNull(Tuple* tuple, const SlotDescriptor& slot); - /// Decodes rows previously fetched from kudu, now in 'cur_rows_' into a RowBatch. /// - 'batch' is the batch that will point to the new tuples. /// - *tuple_mem should be the location to output tuples. @@ -82,26 +76,11 @@ class KuduScanner { /// Closes the current kudu::client::KuduScanner. void CloseCurrentClientScanner(); - /// Given a tuple, copies the values of those columns that require additional memory - /// from memory owned by the kudu::client::KuduScanner into memory owned by the - /// RowBatch. Assumes that the other columns are already materialized. - Status RelocateValuesFromKudu(Tuple* tuple, MemPool* mem_pool); - - /// Transforms a kudu row into an Impala row. Columns that don't require auxiliary - /// memory are copied to the tuple directly. String columns are stored as a reference to - /// the memory of the RowPtr and need to be relocated later. - Status KuduRowToImpalaTuple(const kudu::client::KuduScanBatch::RowPtr& row, - RowBatch* row_batch, Tuple* tuple); - inline Tuple* next_tuple(Tuple* t) const { uint8_t* mem = reinterpret_cast<uint8_t*>(t); return reinterpret_cast<Tuple*>(mem + scan_node_->tuple_desc()->byte_size()); } - /// Returns the tuple idx into the row for this scan node to output to. - /// Currently this is always 0. - int tuple_idx() const { return 0; } - KuduScanNode* scan_node_; RuntimeState* state_; @@ -120,13 +99,6 @@ class KuduScanner { /// The scanner's cloned copy of the conjuncts to apply. vector<ExprContext*> conjunct_ctxs_; - - /// List of string slots that need relocation for their auxiliary memory. - std::vector<SlotDescriptor*> string_slots_; - - /// Number of string slots that need relocation (i.e. size of string_slots_), stored - /// separately to avoid calling vector::size() in the hot path (IMPALA-3348). - int num_string_slots_; }; } /// namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f3f4b71/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java index 3a0fc06..9a9c058 100644 --- a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java +++ b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java @@ -25,6 +25,7 @@ import org.apache.impala.catalog.ColumnStats; import org.apache.impala.catalog.KuduColumn; import org.apache.impala.catalog.Type; import org.apache.impala.thrift.TSlotDescriptor; + import com.google.common.base.Joiner; import com.google.common.base.Objects; import com.google.common.base.Preconditions; @@ -140,9 +141,8 @@ public class SlotDescriptor { } public Path getPath() { return path_; } - public boolean isScanSlot() { return path_ != null && path_.isRootedAtTable(); } - + public boolean isKuduScanSlot() { return getColumn() instanceof KuduColumn; } public Column getColumn() { return !isScanSlot() ? null : path_.destColumn(); } public ColumnStats getStats() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f3f4b71/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java index 5fbe5f6..bf6b93a 100644 --- a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java +++ b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java @@ -19,6 +19,7 @@ package org.apache.impala.analysis; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -28,7 +29,6 @@ import org.apache.impala.catalog.ColumnStats; import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.StructType; import org.apache.impala.catalog.Table; -import org.apache.impala.catalog.View; import org.apache.impala.thrift.TTupleDescriptor; import com.google.common.base.Joiner; @@ -59,8 +59,9 @@ import com.google.common.collect.Lists; * * Memory Layout * Slots are placed in descending order by size with trailing bytes to store null flags. - * Null flags are omitted for non-nullable slots. There is no padding between tuples when - * stored back-to-back in a row batch. + * Null flags are omitted for non-nullable slots, except for Kudu scan slots which always + * have a null flag to match Kudu's client row format. There is no padding between tuples + * when stored back-to-back in a row batch. * * Example: select bool_col, int_col, string_col, smallint_col from functional.alltypes * Slots: string_col|int_col|smallint_col|bool_col|null_byte @@ -118,6 +119,21 @@ public class TupleDescriptor { return result; } + /** + * Returns all materialized slots ordered by their offset. Valid to call after the + * mem layout has been computed. + */ + public ArrayList<SlotDescriptor> getSlotsOrderedByOffset() { + Preconditions.checkState(hasMemLayout_); + ArrayList<SlotDescriptor> result = getMaterializedSlots(); + Collections.sort(result, new Comparator<SlotDescriptor> () { + public int compare(SlotDescriptor a, SlotDescriptor b) { + return Integer.compare(a.getByteOffset(), b.getByteOffset()); + } + }); + return result; + } + public Table getTable() { if (path_ == null) return null; return path_.getRootTable(); @@ -199,9 +215,7 @@ public class TupleDescriptor { * Materialize all slots. */ public void materializeSlots() { - for (SlotDescriptor slot: slots_) { - slot.setIsMaterialized(true); - } + for (SlotDescriptor slot: slots_) slot.setIsMaterialized(true); } public TTupleDescriptor toThrift(Integer tableId) { @@ -223,7 +237,7 @@ public class TupleDescriptor { new HashMap<Integer, List<SlotDescriptor>>(); // populate slotsBySize - int numNullableSlots = 0; + int numNullBits = 0; int totalSlotSize = 0; for (SlotDescriptor d: slots_) { if (!d.isMaterialized()) continue; @@ -239,14 +253,14 @@ public class TupleDescriptor { } totalSlotSize += d.getType().getSlotSize(); slotsBySize.get(d.getType().getSlotSize()).add(d); - if (d.getIsNullable()) ++numNullableSlots; + if (d.getIsNullable() || d.isKuduScanSlot()) ++numNullBits; } // we shouldn't have anything of size <= 0 Preconditions.checkState(!slotsBySize.containsKey(0)); Preconditions.checkState(!slotsBySize.containsKey(-1)); // assign offsets to slots in order of descending size - numNullBytes_ = (numNullableSlots + 7) / 8; + numNullBytes_ = (numNullBits + 7) / 8; int slotOffset = 0; int nullIndicatorByte = totalSlotSize; int nullIndicatorBit = 0; @@ -266,13 +280,16 @@ public class TupleDescriptor { slotOffset += slotSize; // assign null indicator - if (d.getIsNullable()) { + if (d.getIsNullable() || d.isKuduScanSlot()) { d.setNullIndicatorByte(nullIndicatorByte); d.setNullIndicatorBit(nullIndicatorBit); nullIndicatorBit = (nullIndicatorBit + 1) % 8; if (nullIndicatorBit == 0) ++nullIndicatorByte; - } else { - // non-nullable slots will have 0 for the byte offset and -1 for the bit mask + } + // non-nullable slots have 0 for the byte offset and -1 for the bit mask + // to make sure IS NULL always evaluates to false in the BE without having + // to check nullability explicitly + if (!d.getIsNullable()) { d.setNullIndicatorBit(-1); d.setNullIndicatorByte(0); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f3f4b71/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java index 9434801..d338608 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java @@ -118,13 +118,16 @@ public class KuduScanNode extends ScanNode { // Materialize the slots of the remaining conjuncts (i.e. those not pushed to Kudu) analyzer.materializeSlots(conjuncts_); + // Compute mem layout before the scan range locations because creation of the Kudu + // scan tokens depends on having a mem layout. + computeMemLayout(analyzer); + // Creates Kudu scan tokens and sets the scan range locations. computeScanRangeLocations(analyzer, client, rpcTable); } catch (Exception e) { throw new ImpalaRuntimeException("Unable to initialize the Kudu scan node", e); } - computeMemLayout(analyzer); computeStats(analyzer); } @@ -189,15 +192,15 @@ public class KuduScanNode extends ScanNode { /** * Returns KuduScanTokens for this scan given the projected columns and predicates that - * will be pushed to Kudu. + * will be pushed to Kudu. The projected Kudu columns are ordered by offset in an + * Impala tuple to make the Impala and Kudu tuple layouts identical. */ private List<KuduScanToken> createScanTokens(KuduClient client, org.apache.kudu.client.KuduTable rpcTable) { List<String> projectedCols = Lists.newArrayList(); - for (SlotDescriptor desc: getTupleDesc().getSlots()) { - if (desc.isMaterialized()) projectedCols.add(desc.getColumn().getName()); + for (SlotDescriptor desc: getTupleDesc().getSlotsOrderedByOffset()) { + projectedCols.add(desc.getColumn().getName()); } - KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(rpcTable); tokenBuilder.setProjectedColumnNames(projectedCols); for (KuduPredicate predicate: kuduPredicates_) tokenBuilder.addPredicate(predicate);
