Alex Behm has posted comments on this change. Change subject: IMPALA-3905: Add HdfsScanner::GetNext() interface and implementation for Parquet. ......................................................................
Patch Set 2: (19 comments) http://gerrit.cloudera.org:8080/#/c/3732/2/be/src/exec/base-sequence-scanner.cc File be/src/exec/base-sequence-scanner.cc: Line 139: scan_node_->ThreadTokenAvailableCb(state_->resource_pool()); > Has this just been moved from somewhere else? I don't understand this part Sorry we don't need the change in this patch. We'll eventually need it but I'll revert for now (and elsewhere). It was moved out of HdfsScanNode::AddDiskIoRanges() to decouple the adding of ranges and creation of scanner threads. http://gerrit.cloudera.org:8080/#/c/3732/2/be/src/exec/hdfs-parquet-scanner.cc File be/src/exec/hdfs-parquet-scanner.cc: Line 139: skip_row_group_(true), > It looks like it's set to false before it's used anyway, but it's counteri This needs to start out as true to move to the very first row group. It's explained in the header comment of this member. Renamed to advance_row_group_, maybe that's clearer. Line 170: DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail(); > It seems like a lot of the below work corresponds more to the Open() phase Works for me. Done. PS2, Line 236: NULL > Passing NULL argument to reset() is redundant. Done Line 310: assemble_rows_timer_.Start(); > Seems easier to use a scoped timer here and avoid splitting the function ca It's a little tricky because the IO wait time is ignored by this timer (i.e. is started/stopped deep down in the column readers). Maybe we don't need this much granularity and one scoped timer for GetNext() is ok? Take a look at the new solution. Line 312: int max_tuples = min<int64_t>(row_batch->capacity(), rows_remaining); > Why the int64_t here? Everything else involved is an int. Done Line 325: assemble_rows_timer_.Start(); > SCOPED_TIMER? See above. Line 354: if (!scan_node_->PartitionPassesFilterPredicates( > Wrapping seems weird here - ok if it's just a unfortunate consequence of th I renamed the lengthy function to save some chars. Hopefully looks a little cleaner now. Line 402: // This row group will be handled by a different scanner. > Maybe explain briefly why? It is because it's included in the previous or n I think it could be either the next or previous split, depending on which of the conditions above fails. I added a comment to hopefully clarify. PS2, Line 409: seeding_succeeded > seeded_ok? It's bit of a mouthful. Done Line 439: void HdfsParquetScanner::TransferResources(RowBatch* row_batch) { > It's not immediately clear what resources this is transferring and when it' Renamed to FlushFinal() and added more comments. Let me know if still unclear. Line 443: for (ParquetColumnReader* r: column_readers_) r->TransferResources(row_batch); > I don't feel strongly about this, but we don't generally do one-line for st Too bad. I like one-liners :). Done. PS2, Line 514: scan node > Update comment - there won't necessarily be a scan node. I think there will always be a scan node, even for single-threaded scans. http://gerrit.cloudera.org:8080/#/c/3732/2/be/src/exec/hdfs-scan-node.cc File be/src/exec/hdfs-scan-node.cc: PS2, Line 270: ThreadTokenAvailableCb(runtime_state_->resource_pool()); > Just curious, how would it help moving this out of AddDiskIoRanges() other I reverted these changes for now, but they may eventually be needed to decouple adding scan ranges from creating scanner threads (see HdfsScanNode::AddDiskIoRanges()). Sailesh, these changes are intended to help with the new single-threaded scan node (not included in this patch) which still issues scan ranges, but does not start up scanner threads. Let's tackle that part later. Line 876: > Extraneous new line. Done http://gerrit.cloudera.org:8080/#/c/3732/2/be/src/exec/hdfs-scanner.h File be/src/exec/hdfs-scanner.h: Line 114: /// The memory referenced by tuples in the returned batch is valid until Close(). > This isn't the guarantee that ExecNode::GetNext() provides (actually I'm no Correct, the guarantees we need to provide here are slightly different from the ExecNode() guarantees if we want to be able to implement ProcessSplit() via GetNext(). We need to make sure that the memory of batches in the row batch queue remain valid, so it's not sufficient to for the memory to just be valid until GetNext() is called again. Your wording sounds good. Used that, thanks! PS2, Line 115: bool* eos > One idea I've thought about for ExecNode is that it might be simpler to mak I don't think it makes much difference in this patch. I'll keep it in mind when actually adding the new scan node, maybe it makes more sense then. I agree that having a HasNext() call might be more convenient than returning eos in GetNext() Line 124: virtual void TransferResources(RowBatch* row_batch) { > I'm not sure that it makes sense to make this as part of the HdfsScanner in Moved these changes to HdfsScanner::Close() instead. http://gerrit.cloudera.org:8080/#/c/3732/2/be/src/exec/parquet-column-readers.h File be/src/exec/parquet-column-readers.h: Line 228: virtual void TransferResources(RowBatch* row_batch) = 0; > When is it safe to call this? I made this Close() instead to clarify. -- To view, visit http://gerrit.cloudera.org:8080/3732 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: Iab50770bac05afcda4d3404fb4f53a0104931eb0 Gerrit-PatchSet: 2 Gerrit-Project: Impala Gerrit-Branch: cdh5-trunk Gerrit-Owner: Alex Behm <[email protected]> Gerrit-Reviewer: Alex Behm <[email protected]> Gerrit-Reviewer: Sailesh Mukil <[email protected]> Gerrit-Reviewer: Tim Armstrong <[email protected]> Gerrit-HasComments: Yes
