http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/hdfs-parquet-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h deleted file mode 100644 index d559b8e..0000000 --- a/be/src/exec/hdfs-parquet-scanner.h +++ /dev/null @@ -1,654 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -#ifndef IMPALA_EXEC_HDFS_PARQUET_SCANNER_H -#define IMPALA_EXEC_HDFS_PARQUET_SCANNER_H - -#include "codegen/impala-ir.h" -#include "exec/hdfs-scanner.h" -#include "exec/parquet-common.h" -#include "exec/parquet-scratch-tuple-batch.h" -#include "exec/parquet-metadata-utils.h" -#include "runtime/scoped-buffer.h" -#include "util/runtime-profile-counters.h" - -namespace impala { - -class CollectionValueBuilder; -struct HdfsFileDesc; - -/// Internal schema representation and resolution. -struct SchemaNode; - -/// Class that implements Parquet definition and repetition level decoding. -class ParquetLevelDecoder; - -/// Per column reader. -class ParquetColumnReader; -class CollectionColumnReader; -class BaseScalarColumnReader; -template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> -class ScalarColumnReader; -class BoolColumnReader; - -/// This scanner parses Parquet files located in HDFS, and writes the content as tuples in -/// the Impala in-memory representation of data, e.g. (tuples, rows, row batches). -/// For the file format spec, see: github.com/apache/parquet-format -/// -/// ---- Schema resolution ---- -/// Additional columns are allowed at the end in either the table or file schema (i.e., -/// extra columns at the end of the schema or extra fields at the end of a struct). If -/// there are extra columns in the file schema, they are simply ignored. If there are -/// extra in the table schema, we return NULLs for those columns (if they're -/// materialized). -/// -/// ---- Disk IO ---- -/// Parquet (and other columnar formats) use scan ranges differently than other formats. -/// Each materialized column maps to a single ScanRange per row group. For streaming -/// reads, all the columns need to be read in parallel. This is done by issuing one -/// ScanRange (in IssueInitialRanges()) for the file footer per split. -/// ProcessSplit() is called once for each original split and determines the row groups -/// whose midpoints fall within that split. We use the mid-point to determine whether a -/// row group should be processed because if the row group size is less than or equal to -/// the split size, the mid point guarantees that we have at least 50% of the row group in -/// the current split. ProcessSplit() then computes the column ranges for these row groups -/// and submits them to the IoMgr for immediate scheduling (so they don't surface in -/// RequestContext::GetNextUnstartedRange()). Scheduling them immediately also guarantees -/// they are all read at once. -/// -/// Like the other scanners, each parquet scanner object is one to one with a -/// ScannerContext. Unlike the other scanners though, the context will have multiple -/// streams, one for each column. Row groups are processed one at a time this way. -/// -/// ---- Nested types ---- -/// This scanner supports reading and materializing nested data. For a good overview of -/// how nested data is encoded, see blog.twitter.com/2013/dremel-made-simple-with-parquet. -/// For how SQL nested schemas are translated to parquet schemas, see -/// github.com/apache/parquet-format/blob/master/LogicalTypes.md#nested-types. -/// -/// Examples: -/// For these examples, we will use the following table definition: -/// tbl: -/// id bigint -/// array_col array<array<int>> -/// -/// The table definition could correspond to the following parquet schema (note the -/// required 'id' field. If written by Impala, all non-repeated fields would be optional, -/// but we can read repeated fields as well): -/// -/// required group record d=0 r=0 -/// req int64 id d=0 r=0 -/// opt group array_col (LIST) d=1 r=0 -/// repeated group list d=2 r=1 -/// opt group item (LIST) d=3 r=1 -/// repeated group list d=4 r=2 -/// opt int32 item d=5 r=2 -/// -/// Each element in the schema has been annotated with the maximum def level and maximum -/// rep level corresponding to that element. Note that the repeated elements add a def -/// level. This distinguishes between 0 items (empty list) and more than 0 items -/// (non-empty list). The containing optional LIST element for each array determines -/// whether the whole list is null or non-null. Maps work the same way, the only -/// differences being that the repeated group contains two child fields ("key" and "value" -/// instead of "item"), and the outer element is annotated with MAP instead of LIST. -/// -/// Only scalar schema elements are materialized in parquet files; internal nested -/// elements can be reconstructed using the def and rep levels. To illustrate this, here -/// is data containing every valid definition and repetition for the materialized int -/// 'item' element. The data records appear on the left, the encoded definition levels, -/// repetition levels, and values for the 'item' field appear on the right (the encoded -/// 'id' field is not shown). -/// -/// record d r v -/// ------------------------------------ -/// {id: 0, array_col: NULL} 0 0 - -/// {id: 1, array_col: []} 1 0 - -/// {id: 2, array_col: [NULL]} 2 0 - -/// {id: 3, array_col: [[]]} 3 0 - -/// {id: 4, array_col: [[NULL]]} 4 0 - -/// {id: 5, array_col: [[1, 5 0 1 -/// NULL], 4 2 - -/// [2]]} 5 1 2 -/// {id: 6, array_col: [[3]]} 5 0 3 -/// -/// * Example query 1: -/// select id, inner.item from tbl t, t.array_col outer, outer.item inner -/// Results from above sample data: -/// 4,NULL -/// 5,1 -/// 5,NULL -/// 5,2 -/// 6,3 -/// -/// Descriptors: -/// Tuple(id=0 tuple_path=[] slots=[ -/// Slot(id=0 type=ARRAY col_path=[1] collection_item_tuple_id=1), -/// Slot(id=2 type=BIGINT col_path=[0])]) -/// Tuple(id=1 tuple_path=[1] slots=[ -/// Slot(id=1 type=ARRAY col_path=[1,0] collection_item_tuple_id=2)]) -/// Tuple(id=2 tuple_path=[1, 0] slots=[ -/// Slot(id=3 type=INT col_path=[1,0,0])]) -/// -/// The parquet scanner will materialize the following in-memory row batch: -/// RowBatch -/// +==========+ -/// | 0 | NULL | -/// |----------| -/// | 1 | NULL | outer -/// |----------| +======+ -/// | 2 | --------->| NULL | -/// | | | +======+ -/// |----------| -/// | | | +======+ -/// | 3 | --------->| NULL | -/// | | | +======+ -/// | | | inner -/// |----------| +======+ +======+ -/// | 4 | --------->| -------->| NULL | -/// | | | +======+ +======+ -/// | | | -/// |----------| +======+ +======+ -/// | 5 | --------->| -------->| 1 | -/// | | | | | +------+ -/// | | | | | | NULL | -/// | | | +------+ +======+ -/// | | | | | -/// | | | | | +======+ -/// | | | | -------->| 2 | -/// | | | +======+ +======+ -/// | | | -/// |----------| +======+ +======+ -/// | 6 | --------->| -------->| 3 | -/// +==========+ +======+ +======+ -/// -/// The top-level row batch contains two slots, one containing the int64_t 'id' slot and -/// the other containing the CollectionValue 'array_col' slot. The CollectionValues in -/// turn contain pointers to their item tuple data. Each item tuple contains a single -/// ArrayColumn slot ('array_col.item'). The inner CollectionValues' item tuples contain -/// a single int 'item' slot. -/// -/// Note that the scanner materializes a NULL CollectionValue for empty collections. -/// This is technically a bug (it should materialize a CollectionValue with num_tuples = -/// 0), but we don't distinguish between these two cases yet. -/// TODO: fix this (IMPALA-2272) -/// -/// The column readers that materialize this structure form a tree analogous to the -/// materialized output: -/// CollectionColumnReader slot_id=0 node="repeated group list (d=2 r=1)" -/// CollectionColumnReader slot_id=1 node="repeated group list (d=4 r=2)" -/// ScalarColumnReader<int32_t> slot_id=3 node="opt int32 item (d=5 r=2)" -/// ScalarColumnReader<int64_t> slot_id=2 node="req int64 id (d=0 r=0)" -/// -/// Note that the collection column readers reference the "repeated group item" schema -/// element of the serialized array, not the outer "opt group" element. This is what -/// causes the bug described above, it should consider both elements. -/// -/// * Example query 2: -/// select inner.item from tbl.array_col.item inner; -/// Results from the above sample data: -/// NULL -/// 1 -/// NULL -/// 2 -/// 3 -/// -/// Descriptors: -/// Tuple(id=0 tuple_path=[1, 0] slots=[ -/// Slot(id=0 type=INT col_path=[1,0,0])]) -/// -/// In-memory row batch: -/// +======+ -/// | NULL | -/// |------| -/// | 1 | -/// |------| -/// | NULL | -/// |------| -/// | 2 | -/// |------| -/// | 3 | -/// +======+ -/// -/// Column readers: -/// ScalarColumnReader<int32_t> slot_id=0 node="opt int32 item (d=5 r=2)" -/// -/// In this example, the scanner doesn't materialize a nested in-memory result, since -/// only the single int 'item' slot is materialized. However, it still needs to read the -/// nested data as shown above. An important point to notice is that a tuple is not -/// materialized for every rep and def level pair read -- there are 9 of these pairs -/// total in the sample data above, but only 5 tuples are materialized. This is because -/// in this case, nothing should be materialized for NULL or empty arrays, since we're -/// only materializing the innermost item. If a def level is read that doesn't -/// correspond to any item value (NULL or otherwise), the scanner advances to the next -/// rep and def levels without materializing a tuple. -/// -/// * Example query 3: -/// select id, inner.item from tbl t, t.array_col.item inner -/// Results from the above sample data (same as example 1): -/// 4,NULL -/// 5,1 -/// 5,NULL -/// 5,2 -/// 6,3 -/// -/// Descriptors: -/// Tuple(id=0 tuple_path=[] slots=[ -/// Slot(id=0 type=ARRAY col_path=[2]), -/// Slot(id=1 type=BIGINT col_path=[0])]) -/// Tuple(id=1 tuple_path=[2, 0] slots=[ -/// Slot(id=2 type=INT col_path=[2,0,0])]) -/// -/// In-memory row batch: -/// RowBatch -/// +==========+ -/// | 0 | NULL | -/// |----------| -/// | 1 | NULL | -/// |----------| inner -/// | 2 | --------->+======+ -/// | | | +======+ -/// |----------| -/// | | | -/// | 3 | --------->+======+ -/// | | | +======+ -/// | | | -/// |----------| +======+ -/// | 4 | --------->| NULL | -/// | | | +======+ -/// | | | -/// |----------| +======+ -/// | 5 | --------->| 1 | -/// | | | +------+ -/// | | | | NULL | -/// | | | +------+ -/// | | | | 2 | -/// | | | +======+ -/// | | | -/// |----------| +======+ -/// | 6 | --------->| 3 | -/// +==========+ +======+ -/// -/// Column readers: -/// CollectionColumnReader slot_id=0 node="repeated group list (d=2 r=1)" -/// ScalarColumnReader<int32_t> slot_id=2 node="opt int32 item (d=5 r=2)" -/// ScalarColumnReader<int32_t> id=1 node="req int64 id (d=0 r=0)" -/// -/// In this example, the scanner materializes a "flattened" version of inner, rather -/// than the full 3-level structure. Note that the collection reader references the -/// outer array, which determines how long each materialized array is, and the items in -/// the array are from the inner array. -/// -/// ---- Slot materialization ---- -/// Top-level tuples: -/// The slots of top-level tuples are populated in a column-wise fashion. Each column -/// reader materializes a batch of values into a temporary 'scratch batch'. Once a -/// scratch batch has been fully populated, runtime filters and conjuncts are evaluated -/// against the scratch tuples, and the surviving tuples are set in the output batch that -/// is handed to the scan node. The ownership of tuple memory is transferred from a -/// scratch batch to an output row batch once all tuples in the scratch batch have either -/// been filtered or returned as part of an output batch. -/// -/// Collection items: -/// Unlike the top-level tuples, the item tuples of CollectionValues are populated in -/// a row-wise fashion because doing it column-wise has the following challenges. -/// First, we would need to allocate a scratch batch for every collection-typed slot -/// which could consume a lot of memory. Then we'd need a similar mechanism to transfer -/// tuples that survive conjuncts to an output collection. However, CollectionValues lack -/// the row indirection that row batches have, so we would need to either deep copy the -/// surviving tuples, or come up with a different mechanism altogether. -/// TODO: Populating CollectionValues in a column-wise fashion seems different enough -/// and less critical for most of our users today to defer this task until later. -/// -/// ---- Runtime filters ---- -/// HdfsParquetScanner is able to apply runtime filters that arrive before or during -/// scanning. Filters are applied at both the row group (see AssembleRows()) and row (see -/// ReadRow()) scope. If all filter predicates do not pass, the row or row group will be -/// excluded from output. Only partition-column filters are applied at AssembleRows(). The -/// FilterContexts for these filters are cloned from the parent scan node and attached to -/// the ScannerContext. -class HdfsParquetScanner : public HdfsScanner { - public: - HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState* state); - virtual ~HdfsParquetScanner() {} - - /// Issue just the footer range for each file. We'll then parse the footer and pick - /// out the columns we want. 'files' must not be empty. - static Status IssueInitialRanges(HdfsScanNodeBase* scan_node, - const std::vector<HdfsFileDesc*>& files) - WARN_UNUSED_RESULT; - - virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT; - virtual Status ProcessSplit() WARN_UNUSED_RESULT; - virtual void Close(RowBatch* row_batch); - - /// Codegen ProcessScratchBatch(). Stores the resulting function in - /// 'process_scratch_batch_fn' if codegen was successful or NULL otherwise. - static Status Codegen(HdfsScanNodeBase* node, - const std::vector<ScalarExpr*>& conjuncts, - llvm::Function** process_scratch_batch_fn) - WARN_UNUSED_RESULT; - - /// Initializes a ParquetTimestampDecoder depending on writer, timezone, and the schema - /// of the column. - ParquetTimestampDecoder CreateTimestampDecoder(const parquet::SchemaElement& element); - - /// The rep and def levels are set to this value to indicate the end of a row group. - static const int16_t ROW_GROUP_END = numeric_limits<int16_t>::min(); - /// Indicates an invalid definition or repetition level. - static const int16_t INVALID_LEVEL = -1; - /// Indicates an invalid position value. - static const int16_t INVALID_POS = -1; - - /// Class name in LLVM IR. - static const char* LLVM_CLASS_NAME; - - private: - friend class ParquetColumnReader; - friend class CollectionColumnReader; - friend class BaseScalarColumnReader; - template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> - friend class ScalarColumnReader; - friend class BoolColumnReader; - friend class HdfsParquetScannerTest; - - /// Index of the current row group being processed. Initialized to -1 which indicates - /// that we have not started processing the first row group yet (GetNext() has not yet - /// been called). - int32_t row_group_idx_; - - /// Counts the number of rows processed for the current row group. - int64_t row_group_rows_read_; - - /// Indicates whether we should advance to the next row group in the next GetNext(). - /// Starts out as true to move to the very first row group. - bool advance_row_group_; - - boost::scoped_ptr<ParquetSchemaResolver> schema_resolver_; - - /// Tuple to hold values when reading parquet::Statistics. Owned by perm_pool_. - Tuple* min_max_tuple_; - - /// Clone of Min/max statistics conjunct evaluators. Has the same life time as - /// the scanner. Stored in 'obj_pool_'. - vector<ScalarExprEvaluator*> min_max_conjunct_evals_; - - /// Pool used for allocating caches of definition/repetition levels and tuples for - /// dictionary filtering. The definition/repetition levels are populated by the - /// level readers. The pool is freed in Close(). - boost::scoped_ptr<MemPool> perm_pool_; - - /// Number of scratch batches processed so far. - int64_t row_batches_produced_; - - /// Column reader for each top-level materialized slot in the output tuple. - std::vector<ParquetColumnReader*> column_readers_; - - /// Column readers will write slot values into this scratch batch for - /// top-level tuples. See AssembleRows(). - boost::scoped_ptr<ScratchTupleBatch> scratch_batch_; - - /// File metadata thrift object - parquet::FileMetaData file_metadata_; - - /// Version of the application that wrote this file. - ParquetFileVersion file_version_; - - /// Scan range for the metadata. - const io::ScanRange* metadata_range_; - - /// Pool to copy dictionary page buffer into. This pool is shared across all the - /// pages in a column chunk. - boost::scoped_ptr<MemPool> dictionary_pool_; - - /// Column readers that are eligible for dictionary filtering. - /// These are pointers to elements of column_readers_. Materialized columns that are - /// dictionary encoded correspond to scalar columns that are either top-level columns - /// or nested within a collection. CollectionColumnReaders are not eligible for - /// dictionary filtering so are not included. - std::vector<BaseScalarColumnReader*> dict_filterable_readers_; - - /// Column readers that are not eligible for dictionary filtering. - /// These are pointers to elements of column_readers_. The readers are either top-level - /// or nested within a collection. - std::vector<BaseScalarColumnReader*> non_dict_filterable_readers_; - - /// Flattened list of all scalar column readers in column_readers_. - std::vector<BaseScalarColumnReader*> scalar_readers_; - - /// Flattened collection column readers that point to readers in column_readers_. - std::vector<CollectionColumnReader*> collection_readers_; - - /// Memory used to store the tuples used for dictionary filtering. Tuples owned by - /// perm_pool_. - std::unordered_map<const TupleDescriptor*, Tuple*> dict_filter_tuple_map_; - - /// Timer for materializing rows. This ignores time getting the next buffer. - ScopedTimer<MonotonicStopWatch> assemble_rows_timer_; - - /// Average and min/max time spent processing the footer by each split. - RuntimeProfile::SummaryStatsCounter* process_footer_timer_stats_; - - /// Number of columns that need to be read. - RuntimeProfile::Counter* num_cols_counter_; - - /// Number of row groups that are skipped because of Parquet row group statistics. - RuntimeProfile::Counter* num_stats_filtered_row_groups_counter_; - - /// Number of row groups that need to be read. - RuntimeProfile::Counter* num_row_groups_counter_; - - /// Number of scanners that end up doing no reads because their splits don't overlap - /// with the midpoint of any row-group in the file. - RuntimeProfile::Counter* num_scanners_with_no_reads_counter_; - - /// Number of row groups skipped due to dictionary filter - RuntimeProfile::Counter* num_dict_filtered_row_groups_counter_; - - /// Number of collection items read in current row batch. It is a scanner-local counter - /// used to reduce the frequency of updating HdfsScanNode counter. It is updated by the - /// callees of AssembleRows() and is merged into the HdfsScanNode counter at the end of - /// AssembleRows() and then is reset to 0. - int64_t coll_items_read_counter_; - - typedef int (*ProcessScratchBatchFn)(HdfsParquetScanner*, RowBatch*); - /// The codegen'd version of ProcessScratchBatch() if available, NULL otherwise. - ProcessScratchBatchFn codegend_process_scratch_batch_fn_; - - const char* filename() const { return metadata_range_->file(); } - - virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT; - - /// Evaluates the min/max predicates of the 'scan_node_' using the parquet::Statistics - /// of 'row_group'. 'file_metadata' is used to determine the ordering that was used to - /// compute the statistics. Sets 'skip_row_group' to true if the row group can be - /// skipped, 'false' otherwise. - Status EvaluateStatsConjuncts(const parquet::FileMetaData& file_metadata, - const parquet::RowGroup& row_group, bool* skip_row_group) WARN_UNUSED_RESULT; - - /// Advances 'row_group_idx_' to the next non-empty row group and initializes - /// the column readers to scan it. Recoverable errors are logged to the runtime - /// state. Only returns a non-OK status if a non-recoverable error is encountered - /// (or abort_on_error is true). If OK is returned, 'parse_status_' is guaranteed - /// to be OK as well. - Status NextRowGroup() WARN_UNUSED_RESULT; - - /// Reads data using 'column_readers' to materialize top-level tuples into 'row_batch'. - /// Returns a non-OK status if a non-recoverable error was encountered and execution - /// of this query should be terminated immediately. - /// May set *skip_row_group to indicate that the current row group should be skipped, - /// e.g., due to a parse error, but execution should continue. - Status AssembleRows(const std::vector<ParquetColumnReader*>& column_readers, - RowBatch* row_batch, bool* skip_row_group) WARN_UNUSED_RESULT; - - /// Commit num_rows to the given row batch. - /// Returns OK if the query is not cancelled and hasn't exceeded any mem limits. - /// Scanner can call this with 0 rows to flush any pending resources (attached pools - /// and io buffers) to minimize memory consumption. - Status CommitRows(RowBatch* dst_batch, int num_rows) WARN_UNUSED_RESULT; - - /// Evaluates runtime filters and conjuncts (if any) against the tuples in - /// 'scratch_batch_', and adds the surviving tuples to the given batch. - /// Transfers the ownership of tuple memory to the target batch when the - /// scratch batch is exhausted. - /// Returns the number of rows that should be committed to the given batch. - int TransferScratchTuples(RowBatch* dst_batch); - - /// Processes a single row batch for TransferScratchTuples, looping over scratch_batch_ - /// until it is exhausted or the output is full. Called for the case when there are - /// materialized tuples. This is a separate function so it can be codegened. - int ProcessScratchBatch(RowBatch* dst_batch); - - /// Reads data using 'column_readers' to materialize the tuples of a CollectionValue - /// allocated from 'coll_value_builder'. Increases 'coll_items_read_counter_' by the - /// number of items in this collection and descendant collections. - /// - /// 'new_collection_rep_level' indicates when the end of the collection has been - /// reached, namely when current_rep_level <= new_collection_rep_level. - /// - /// Returns true when the end of the current collection is reached, and execution can - /// be safely resumed. - /// Returns false if execution should be aborted due to: - /// - parse_error_ is set - /// - query is cancelled - /// - scan node limit was reached - /// When false is returned the column_readers are left in an undefined state and - /// execution should be aborted immediately by the caller. - bool AssembleCollection(const std::vector<ParquetColumnReader*>& column_readers, - int new_collection_rep_level, CollectionValueBuilder* coll_value_builder); - - /// Function used by AssembleCollection() to materialize a single collection item - /// into 'tuple'. Returns false if execution should be aborted for some reason, - /// otherwise returns true. - /// If 'materialize_tuple' is false, only advances the column readers' levels, - /// and does not read any data values. - inline bool ReadCollectionItem(const std::vector<ParquetColumnReader*>& column_readers, - bool materialize_tuple, MemPool* pool, Tuple* tuple) const; - - /// Process the file footer and parse file_metadata_. This should be called with the - /// last FOOTER_SIZE bytes in context_. - Status ProcessFooter() WARN_UNUSED_RESULT; - - /// Populates 'column_readers' for the slots in 'tuple_desc', including creating child - /// readers for any collections. Schema resolution is handled in this function as - /// well. Fills in the appropriate template tuple slot with NULL for any materialized - /// fields missing in the file. - Status CreateColumnReaders(const TupleDescriptor& tuple_desc, - const ParquetSchemaResolver& schema_resolver, - std::vector<ParquetColumnReader*>* column_readers) WARN_UNUSED_RESULT; - - /// Returns the total number of scalar column readers in 'column_readers', including - /// the children of collection readers. - int CountScalarColumns(const std::vector<ParquetColumnReader*>& column_readers); - - /// Creates a column reader that reads one value for each item in the table or - /// collection element corresponding to 'parent_path'. 'parent_path' should point to - /// either a collection element or the root schema (i.e. empty path). The returned - /// reader has no slot desc associated with it, meaning only NextLevels() and not - /// ReadValue() can be called on it. - /// - /// This is used for counting item values, rather than materializing any values. For - /// example, in a count(*) over a collection, there are no values to materialize, but we - /// still need to iterate over every item in the collection to count them. - Status CreateCountingReader(const SchemaPath& parent_path, - const ParquetSchemaResolver& schema_resolver, - ParquetColumnReader** reader) - WARN_UNUSED_RESULT; - - /// Walks file_metadata_ and initiates reading the materialized columns. This - /// initializes 'scalar_readers_' and divides reservation between the columns but - /// does not start any scan ranges. - Status InitScalarColumns() WARN_UNUSED_RESULT; - - /// Decides how to divide stream_->reservation() between the columns. May increase - /// the reservation if more reservation would enable more efficient I/O for the - /// current columns being scanned. Sets the reservation on each corresponding reader - /// in 'column_readers'. - Status DivideReservationBetweenColumns( - const std::vector<BaseScalarColumnReader*>& column_readers); - - /// Compute the ideal reservation to scan a file with scan range lengths - /// 'col_range_lengths' given the min and max buffer size of the singleton DiskIoMgr - /// in ExecEnv. - static int64_t ComputeIdealReservation(const std::vector<int64_t>& col_range_lengths); - - /// Helper for DivideReservationBetweenColumns(). Implements the core algorithm for - /// dividing a reservation of 'reservation_to_distribute' bytes between columns with - /// scan range lengths 'col_range_lengths' given a min and max buffer size. Returns - /// a vector with an entry per column with the index into 'col_range_lengths' and the - /// amount of reservation in bytes to give to that column. - static std::vector<std::pair<int, int64_t>> DivideReservationBetweenColumnsHelper( - int64_t min_buffer_size, int64_t max_buffer_size, - const std::vector<int64_t>& col_range_lengths, int64_t reservation_to_distribute); - - /// Initializes the column readers in collection_readers_. - void InitCollectionColumns(); - - /// Initialize dictionaries for all column readers - Status InitDictionaries(const std::vector<BaseScalarColumnReader*>& column_readers) - WARN_UNUSED_RESULT; - - /// Performs some validation once we've reached the end of a row group to help detect - /// bugs or bad input files. - Status ValidateEndOfRowGroup(const std::vector<ParquetColumnReader*>& column_readers, - int row_group_idx, int64_t rows_read) WARN_UNUSED_RESULT; - - /// Part of the HdfsScanner interface, not used in Parquet. - Status InitNewRange() WARN_UNUSED_RESULT { return Status::OK(); } - - /// Transfers the remaining resources backing tuples such as IO buffers and memory - /// from mem pools to the given row batch. Closes all column readers. - /// Should be called after completing a row group and when returning the last batch. - void FlushRowGroupResources(RowBatch* row_batch); - - /// Releases resources associated with a row group that was skipped and closes all - /// column readers. Should be called after skipping a row group from which no rows - /// were returned. - void ReleaseSkippedRowGroupResources(); - - /// Evaluates whether the column reader is eligible for dictionary predicates - bool IsDictFilterable(ParquetColumnReader* col_reader); - - /// Evaluates whether the column reader is eligible for dictionary predicates. - bool IsDictFilterable(BaseScalarColumnReader* col_reader); - - /// Partitions the readers into scalar and collection readers. The collection readers - /// are flattened into collection_readers_. The scalar readers are partitioned into - /// dict_filterable_readers_ and non_dict_filterable_readers_ depending on whether - /// dictionary filtering is enabled and the reader can be dictionary filtered. All - /// scalar readers are also flattened into scalar_readers_. - void PartitionReaders(const vector<ParquetColumnReader*>& readers, - bool can_eval_dict_filters); - - /// Divides the column readers into dict_filterable_readers_, - /// non_dict_filterable_readers_ and collection_readers_. Allocates memory for - /// dict_filter_tuple_map_. - Status InitDictFilterStructures() WARN_UNUSED_RESULT; - - /// Returns true if all of the data pages in the column chunk are dictionary encoded - bool IsDictionaryEncoded(const parquet::ColumnMetaData& col_metadata); - - /// Checks to see if this row group can be eliminated based on applying conjuncts - /// to the dictionary values. Specifically, if any dictionary-encoded column has - /// no values that pass the relevant conjuncts, then the row group can be skipped. - Status EvalDictionaryFilters(const parquet::RowGroup& row_group, - bool* skip_row_group) WARN_UNUSED_RESULT; -}; - -} // namespace impala - -#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/hdfs-parquet-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc deleted file mode 100644 index dd60efd..0000000 --- a/be/src/exec/hdfs-parquet-table-writer.cc +++ /dev/null @@ -1,1321 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/hdfs-parquet-table-writer.h" - -#include <boost/unordered_set.hpp> - -#include "common/version.h" -#include "exec/hdfs-table-sink.h" -#include "exec/parquet-column-stats.inline.h" -#include "exprs/scalar-expr.h" -#include "exprs/scalar-expr-evaluator.h" -#include "rpc/thrift-util.h" -#include "runtime/decimal-value.h" -#include "runtime/mem-tracker.h" -#include "runtime/raw-value.h" -#include "runtime/row-batch.h" -#include "runtime/runtime-state.h" -#include "runtime/string-value.inline.h" -#include "util/bit-stream-utils.h" -#include "util/bit-util.h" -#include "util/buffer-builder.h" -#include "util/compress.h" -#include "util/debug-util.h" -#include "util/dict-encoding.h" -#include "util/hdfs-util.h" -#include "util/string-util.h" -#include "util/rle-encoding.h" - -#include <sstream> - -#include "gen-cpp/ImpalaService_types.h" - -#include "common/names.h" -using namespace impala; -using namespace apache::thrift; - -// Managing file sizes: We need to estimate how big the files being buffered -// are in order to split them correctly in HDFS. Having a file that is too big -// will cause remote reads (parquet files are non-splittable). -// It's too expensive to compute the exact file sizes as the rows are buffered -// since the values in the current pages are only encoded/compressed when the page -// is full. Once the page is full, we encode and compress it, at which point we know -// the exact on file size. -// The current buffered pages (one for each column) can have a very poor estimate. -// To adjust for this, we aim for a slightly smaller file size than the ideal. -// -// Class that encapsulates all the state for writing a single column. This contains -// all the buffered pages as well as the metadata (e.g. byte sizes, num values, etc). -// This is intended to be created once per writer per column and reused across -// row groups. -// We currently accumulate all the data pages for an entire row group per column -// before flushing them. This can be pretty large (hundreds of MB) but we can't -// fix this without collocated files in HDFS. With collocated files, the minimum -// we'd need to buffer is 1 page per column so on the order of 1MB (although we might -// decide to buffer a few pages for better HDFS write performance). -// Pages are reused between flushes. They are created on demand as necessary and -// recycled after a flush. -// As rows come in, we accumulate the encoded values into the values_ and def_levels_ -// buffers. When we've accumulated a page worth's of data, we combine values_ and -// def_levels_ into a single buffer that would be the exact bytes (with no gaps) in -// the file. The combined buffer is compressed if compression is enabled and we -// keep the combined/compressed buffer until we need to flush the file. The -// values_ and def_levels_ are then reused for the next page. -// -// TODO: For codegen, we would codegen the AppendRow() function for each column. -// This codegen is specific to the column expr (and type) and encoding. The -// parent writer object would combine all the generated AppendRow from all -// the columns and run that function over row batches. -// TODO: we need to pass in the compression from the FE/metadata - -DECLARE_bool(enable_parquet_page_index_writing_debug_only); - -namespace impala { - -// Base class for column writers. This contains most of the logic except for -// the type specific functions which are implemented in the subclasses. -class HdfsParquetTableWriter::BaseColumnWriter { - public: - // expr - the expression to generate output values for this column. - BaseColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* expr_eval, - const THdfsCompression::type& codec) - : parent_(parent), - expr_eval_(expr_eval), - codec_(codec), - page_size_(DEFAULT_DATA_PAGE_SIZE), - current_page_(nullptr), - num_values_(0), - total_compressed_byte_size_(0), - total_uncompressed_byte_size_(0), - dict_encoder_base_(nullptr), - def_levels_(nullptr), - values_buffer_len_(DEFAULT_DATA_PAGE_SIZE), - page_stats_base_(nullptr), - row_group_stats_base_(nullptr), - table_sink_mem_tracker_(parent_->parent_->mem_tracker()) { - static_assert(std::is_same<decltype(parent_->parent_), HdfsTableSink*>::value, - "'table_sink_mem_tracker_' must point to the mem tracker of an HdfsTableSink"); - def_levels_ = parent_->state_->obj_pool()->Add( - new RleEncoder(parent_->reusable_col_mem_pool_->Allocate(DEFAULT_DATA_PAGE_SIZE), - DEFAULT_DATA_PAGE_SIZE, 1)); - values_buffer_ = parent_->reusable_col_mem_pool_->Allocate(values_buffer_len_); - } - - virtual ~BaseColumnWriter() {} - - // Called after the constructor to initialize the column writer. - Status Init() WARN_UNUSED_RESULT { - Reset(); - RETURN_IF_ERROR(Codec::CreateCompressor(nullptr, false, codec_, &compressor_)); - return Status::OK(); - } - - // Appends the row to this column. This buffers the value into a data page. Returns - // error if the space needed for the encoded value is larger than the data page size. - // TODO: this needs to be batch based, instead of row based for better performance. - // This is a bit trickier to handle the case where only a partial row batch can be - // output to the current file because it reaches the max file size. Enabling codegen - // would also solve this problem. - Status AppendRow(TupleRow* row) WARN_UNUSED_RESULT; - - // Flushes all buffered data pages to the file. - // *file_pos is an output parameter and will be incremented by - // the number of bytes needed to write all the data pages for this column. - // first_data_page and first_dictionary_page are also out parameters and - // will contain the byte offset for the data page and dictionary page. They - // will be set to -1 if the column does not contain that type of page. - Status Flush(int64_t* file_pos, int64_t* first_data_page, - int64_t* first_dictionary_page) WARN_UNUSED_RESULT; - - // Materializes the column statistics to the per-file MemPool so they are available - // after their row batch buffer has been freed. - Status MaterializeStatsValues() WARN_UNUSED_RESULT { - RETURN_IF_ERROR(row_group_stats_base_->MaterializeStringValuesToInternalBuffers()); - RETURN_IF_ERROR(page_stats_base_->MaterializeStringValuesToInternalBuffers()); - return Status::OK(); - } - - // Encodes the row group statistics into a parquet::Statistics object and attaches it to - // 'meta_data'. - void EncodeRowGroupStats(parquet::ColumnMetaData* meta_data) { - DCHECK(row_group_stats_base_ != nullptr); - if (row_group_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) { - row_group_stats_base_->EncodeToThrift(&meta_data->statistics); - meta_data->__isset.statistics = true; - } - } - - // Resets all the data accumulated for this column. Memory can now be reused for - // the next row group. - // Any data for previous row groups must be reset (e.g. dictionaries). - // Subclasses must call this if they override this function. - virtual void Reset() { - num_values_ = 0; - total_compressed_byte_size_ = 0; - current_encoding_ = parquet::Encoding::PLAIN; - next_page_encoding_ = parquet::Encoding::PLAIN; - pages_.clear(); - current_page_ = nullptr; - column_encodings_.clear(); - dict_encoding_stats_.clear(); - data_encoding_stats_.clear(); - // Repetition/definition level encodings are constant. Incorporate them here. - column_encodings_.insert(parquet::Encoding::RLE); - offset_index_.page_locations.clear(); - column_index_.null_pages.clear(); - column_index_.min_values.clear(); - column_index_.max_values.clear(); - table_sink_mem_tracker_->Release(page_index_memory_consumption_); - page_index_memory_consumption_ = 0; - column_index_.null_counts.clear(); - valid_column_index_ = true; - } - - // Close this writer. This is only called after Flush() and no more rows will - // be added. - void Close() { - if (compressor_.get() != nullptr) compressor_->Close(); - if (dict_encoder_base_ != nullptr) dict_encoder_base_->Close(); - // We must release the memory consumption of this column writer. - table_sink_mem_tracker_->Release(page_index_memory_consumption_); - page_index_memory_consumption_ = 0; - } - - const ColumnType& type() const { return expr_eval_->root().type(); } - uint64_t num_values() const { return num_values_; } - uint64_t total_compressed_size() const { return total_compressed_byte_size_; } - uint64_t total_uncompressed_size() const { return total_uncompressed_byte_size_; } - parquet::CompressionCodec::type GetParquetCodec() const { - return ConvertImpalaToParquetCodec(codec_); - } - - protected: - friend class HdfsParquetTableWriter; - - Status AddMemoryConsumptionForPageIndex(int64_t new_memory_allocation) { - if (UNLIKELY(!table_sink_mem_tracker_->TryConsume(new_memory_allocation))) { - return table_sink_mem_tracker_->MemLimitExceeded(parent_->state_, - "Failed to allocate memory for Parquet page index.", new_memory_allocation); - } - page_index_memory_consumption_ += new_memory_allocation; - return Status::OK(); - } - - Status ReserveOffsetIndex(int64_t capacity) { - if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK(); - RETURN_IF_ERROR( - AddMemoryConsumptionForPageIndex(capacity * sizeof(parquet::PageLocation))); - offset_index_.page_locations.reserve(capacity); - return Status::OK(); - } - - void AddLocationToOffsetIndex(const parquet::PageLocation& location) { - if (!FLAGS_enable_parquet_page_index_writing_debug_only) return; - offset_index_.page_locations.push_back(location); - } - - Status AddPageStatsToColumnIndex() { - if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK(); - parquet::Statistics page_stats; - page_stats_base_->EncodeToThrift(&page_stats); - // If pages_stats contains min_value and max_value, then append them to min_values_ - // and max_values_ and also mark the page as not null. In case min and max values are - // not set, push empty strings to maintain the consistency of the index and mark the - // page as null. Always push the null_count. - string min_val; - string max_val; - if ((page_stats.__isset.min_value) && (page_stats.__isset.max_value)) { - Status s_min = TruncateDown(page_stats.min_value, PAGE_INDEX_MAX_STRING_LENGTH, - &min_val); - Status s_max = TruncateUp(page_stats.max_value, PAGE_INDEX_MAX_STRING_LENGTH, - &max_val); - if (!s_min.ok() || !s_max.ok()) valid_column_index_ = false; - column_index_.null_pages.push_back(false); - } else { - DCHECK(!page_stats.__isset.min_value && !page_stats.__isset.max_value); - column_index_.null_pages.push_back(true); - DCHECK_EQ(page_stats.null_count, num_values_); - } - RETURN_IF_ERROR( - AddMemoryConsumptionForPageIndex(min_val.capacity() + max_val.capacity())); - column_index_.min_values.emplace_back(std::move(min_val)); - column_index_.max_values.emplace_back(std::move(max_val)); - column_index_.null_counts.push_back(page_stats.null_count); - return Status::OK(); - } - - // Encodes value into the current page output buffer and updates the column statistics - // aggregates. Returns true if the value was appended successfully to the current page. - // Returns false if the value was not appended to the current page and the caller can - // create a new page and try again with the same value. May change - // 'next_page_encoding_' if the encoding for the next page should be different - e.g. - // if a dictionary overflowed and dictionary encoding is no longer viable. - // *bytes_needed will contain the (estimated) number of bytes needed to successfully - // encode the value in the page. - // Implemented in the subclass. - virtual bool ProcessValue(void* value, int64_t* bytes_needed) WARN_UNUSED_RESULT = 0; - - // Encodes out all data for the current page and updates the metadata. - virtual Status FinalizeCurrentPage() WARN_UNUSED_RESULT; - - // Update current_page_ to a new page, reusing pages allocated if possible. - void NewPage(); - - // Writes out the dictionary encoded data buffered in dict_encoder_. - void WriteDictDataPage(); - - struct DataPage { - // Page header. This is a union of all page types. - parquet::PageHeader header; - - // Number of bytes needed to store definition levels. - int num_def_bytes; - - // This is the payload for the data page. This includes the definition/repetition - // levels data and the encoded values. If compression is enabled, this is the - // compressed data. - uint8_t* data; - - // If true, this data page has been finalized. All sizes are computed, header is - // fully populated and any compression is done. - bool finalized; - - // Number of non-null values - int num_non_null; - }; - - HdfsParquetTableWriter* parent_; - ScalarExprEvaluator* expr_eval_; - - THdfsCompression::type codec_; - - // Compression codec for this column. If nullptr, this column is will not be - // compressed. - scoped_ptr<Codec> compressor_; - - // Size of newly created pages. Defaults to DEFAULT_DATA_PAGE_SIZE and is increased - // when pages are not big enough. This only happens when there are enough unique values - // such that we switch from PLAIN_DICTIONARY to PLAIN encoding and then have very - // large values (i.e. greater than DEFAULT_DATA_PAGE_SIZE). - // TODO: Consider removing and only creating a single large page as necessary. - int64_t page_size_; - - // Pages belong to this column chunk. We need to keep them in memory in order to write - // them together. - vector<DataPage> pages_; - - // Pointer to the current page in 'pages_'. Not owned. - DataPage* current_page_; - - // Total number of values across all pages, including NULL. - int64_t num_values_; - int64_t total_compressed_byte_size_; - int64_t total_uncompressed_byte_size_; - // Encoding of the current page. - parquet::Encoding::type current_encoding_; - // Encoding to use for the next page. By default, the same as 'current_encoding_'. - // Used by the column writer to switch encoding while writing a column, e.g. if the - // dictionary overflows. - parquet::Encoding::type next_page_encoding_; - - // Set of all encodings used in the column chunk - unordered_set<parquet::Encoding::type> column_encodings_; - - // Map from the encoding to the number of pages in the column chunk with this encoding - // These are used to construct the PageEncodingStats, which provide information - // about encoding usage for each different page type. Currently, only dictionary - // and data pages are used. - unordered_map<parquet::Encoding::type, int> dict_encoding_stats_; - unordered_map<parquet::Encoding::type, int> data_encoding_stats_; - - // Created, owned, and set by the derived class. - DictEncoderBase* dict_encoder_base_; - - // Rle encoder object for storing definition levels, owned by instances of this class. - // For non-nested schemas, this always uses 1 bit per row. This is reused across pages - // since the underlying buffer is copied out when the page is finalized. - RleEncoder* def_levels_; - - // Data for buffered values. This is owned by instances of this class and gets reused - // across pages. - uint8_t* values_buffer_; - // The size of values_buffer_. - int values_buffer_len_; - - // Pointers to statistics, created, owned, and set by the derived class. - ColumnStatsBase* page_stats_base_; - ColumnStatsBase* row_group_stats_base_; - - // OffsetIndex stores the locations of the pages. - parquet::OffsetIndex offset_index_; - - // ColumnIndex stores the statistics of the pages. - parquet::ColumnIndex column_index_; - - // Pointer to the HdfsTableSink's MemTracker. - MemTracker* table_sink_mem_tracker_; - - // Memory consumption of the min/max values in the page index. - int64_t page_index_memory_consumption_ = 0; - - // Only write ColumnIndex when 'valid_column_index_' is true. We always need to write - // the OffsetIndex though. - bool valid_column_index_ = true; -}; - -// Per type column writer. -template<typename T> -class HdfsParquetTableWriter::ColumnWriter : - public HdfsParquetTableWriter::BaseColumnWriter { - public: - ColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* eval, - const THdfsCompression::type& codec) - : BaseColumnWriter(parent, eval, codec), - num_values_since_dict_size_check_(0), - plain_encoded_value_size_( - ParquetPlainEncoder::EncodedByteSize(eval->root().type())) { - DCHECK_NE(eval->root().type().type, TYPE_BOOLEAN); - // IMPALA-7304: Don't write column index for floating-point columns until - // PARQUET-1222 is resolved. - if (std::is_floating_point<T>::value) valid_column_index_ = false; - } - - virtual void Reset() { - BaseColumnWriter::Reset(); - // IMPALA-7304: Don't write column index for floating-point columns until - // PARQUET-1222 is resolved. - if (std::is_floating_point<T>::value) valid_column_index_ = false; - // Default to dictionary encoding. If the cardinality ends up being too high, - // it will fall back to plain. - current_encoding_ = parquet::Encoding::PLAIN_DICTIONARY; - next_page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY; - dict_encoder_.reset( - new DictEncoder<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_, - parent_->parent_->mem_tracker())); - dict_encoder_base_ = dict_encoder_.get(); - page_stats_.reset( - new ColumnStats<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_)); - page_stats_base_ = page_stats_.get(); - row_group_stats_.reset( - new ColumnStats<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_)); - row_group_stats_base_ = row_group_stats_.get(); - } - - protected: - virtual bool ProcessValue(void* value, int64_t* bytes_needed) { - if (current_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) { - if (UNLIKELY(num_values_since_dict_size_check_ >= - DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD)) { - num_values_since_dict_size_check_ = 0; - if (dict_encoder_->EstimatedDataEncodedSize() >= page_size_) return false; - } - ++num_values_since_dict_size_check_; - *bytes_needed = dict_encoder_->Put(*CastValue(value)); - // If the dictionary contains the maximum number of values, switch to plain - // encoding for the next page. The current page is full and must be written out. - if (UNLIKELY(*bytes_needed < 0)) { - next_page_encoding_ = parquet::Encoding::PLAIN; - return false; - } - parent_->file_size_estimate_ += *bytes_needed; - } else if (current_encoding_ == parquet::Encoding::PLAIN) { - T* v = CastValue(value); - *bytes_needed = plain_encoded_value_size_ < 0 ? - ParquetPlainEncoder::ByteSize<T>(*v) : - plain_encoded_value_size_; - if (current_page_->header.uncompressed_page_size + *bytes_needed > page_size_) { - return false; - } - uint8_t* dst_ptr = values_buffer_ + current_page_->header.uncompressed_page_size; - int64_t written_len = - ParquetPlainEncoder::Encode(*v, plain_encoded_value_size_, dst_ptr); - DCHECK_EQ(*bytes_needed, written_len); - current_page_->header.uncompressed_page_size += written_len; - } else { - // TODO: support other encodings here - DCHECK(false); - } - page_stats_->Update(*CastValue(value)); - return true; - } - - private: - // The period, in # of rows, to check the estimated dictionary page size against - // the data page size. We want to start a new data page when the estimated size - // is at least that big. The estimated size computation is not very cheap and - // we can tolerate going over the data page size by some amount. - // The expected byte size per dictionary value is < 1B and at most 2 bytes so the - // error is pretty low. - // TODO: is there a better way? - static const int DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD = 100; - - // Encoder for dictionary encoding for different columns. Only one is set. - scoped_ptr<DictEncoder<T>> dict_encoder_; - - // The number of values added since we last checked the dictionary. - int num_values_since_dict_size_check_; - - // Size of each encoded value in plain encoding. -1 if the type is variable-length. - int64_t plain_encoded_value_size_; - - // Temporary string value to hold CHAR(N) - StringValue temp_; - - // Tracks statistics per page. These are written out to the page index. - scoped_ptr<ColumnStats<T>> page_stats_; - - // Tracks statistics per row group. This gets reset when starting a new row group. - scoped_ptr<ColumnStats<T>> row_group_stats_; - - // Converts a slot pointer to a raw value suitable for encoding - inline T* CastValue(void* value) { - return reinterpret_cast<T*>(value); - } -}; - -template<> -inline StringValue* HdfsParquetTableWriter::ColumnWriter<StringValue>::CastValue( - void* value) { - if (type().type == TYPE_CHAR) { - temp_.ptr = reinterpret_cast<char*>(value); - temp_.len = StringValue::UnpaddedCharLength(temp_.ptr, type().len); - return &temp_; - } - return reinterpret_cast<StringValue*>(value); -} - -// Bools are encoded a bit differently so subclass it explicitly. -class HdfsParquetTableWriter::BoolColumnWriter : - public HdfsParquetTableWriter::BaseColumnWriter { - public: - BoolColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* eval, - const THdfsCompression::type& codec) - : BaseColumnWriter(parent, eval, codec), - page_stats_(parent_->reusable_col_mem_pool_.get(), -1), - row_group_stats_(parent_->reusable_col_mem_pool_.get(), -1) { - DCHECK_EQ(eval->root().type().type, TYPE_BOOLEAN); - bool_values_ = parent_->state_->obj_pool()->Add( - new BitWriter(values_buffer_, values_buffer_len_)); - // Dictionary encoding doesn't make sense for bools and is not allowed by - // the format. - current_encoding_ = parquet::Encoding::PLAIN; - dict_encoder_base_ = nullptr; - - page_stats_base_ = &page_stats_; - row_group_stats_base_ = &row_group_stats_; - } - - protected: - virtual bool ProcessValue(void* value, int64_t* bytes_needed) { - bool v = *reinterpret_cast<bool*>(value); - if (!bool_values_->PutValue(v, 1)) return false; - page_stats_.Update(v); - return true; - } - - virtual Status FinalizeCurrentPage() { - DCHECK(current_page_ != nullptr); - if (current_page_->finalized) return Status::OK(); - bool_values_->Flush(); - int num_bytes = bool_values_->bytes_written(); - current_page_->header.uncompressed_page_size += num_bytes; - // Call into superclass to handle the rest. - RETURN_IF_ERROR(BaseColumnWriter::FinalizeCurrentPage()); - bool_values_->Clear(); - return Status::OK(); - } - - private: - // Used to encode bools as single bit values. This is reused across pages. - BitWriter* bool_values_; - - // Tracks statistics per page. These are written out to the page index. - ColumnStats<bool> page_stats_; - - // Tracks statistics per row group. This gets reset when starting a new file. - ColumnStats<bool> row_group_stats_; -}; - -} - -inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row) { - ++num_values_; - void* value = expr_eval_->GetValue(row); - if (current_page_ == nullptr) NewPage(); - - // Ensure that we have enough space for the definition level, but don't write it yet in - // case we don't have enough space for the value. - if (def_levels_->buffer_full()) { - RETURN_IF_ERROR(FinalizeCurrentPage()); - NewPage(); - } - - // Encoding may fail for several reasons - because the current page is not big enough, - // because we've encoded the maximum number of unique dictionary values and need to - // switch to plain encoding, etc. so we may need to try again more than once. - // TODO: Have a clearer set of state transitions here, to make it easier to see that - // this won't loop forever. - while (true) { - // Nulls don't get encoded. Increment the null count of the parquet statistics. - if (value == nullptr) { - DCHECK(page_stats_base_ != nullptr); - page_stats_base_->IncrementNullCount(1); - break; - } - - int64_t bytes_needed = 0; - if (ProcessValue(value, &bytes_needed)) { - ++current_page_->num_non_null; - break; // Succesfully appended, don't need to retry. - } - - // Value didn't fit on page, try again on a new page. - RETURN_IF_ERROR(FinalizeCurrentPage()); - - // Check how much space is needed to write this value. If that is larger than the - // page size then increase page size and try again. - if (UNLIKELY(bytes_needed > page_size_)) { - if (bytes_needed > MAX_DATA_PAGE_SIZE) { - stringstream ss; - ss << "Cannot write value of size " - << PrettyPrinter::Print(bytes_needed, TUnit::BYTES) << " bytes to a Parquet " - << "data page that exceeds the max page limit " - << PrettyPrinter::Print(MAX_DATA_PAGE_SIZE , TUnit::BYTES) << "."; - return Status(ss.str()); - } - page_size_ = bytes_needed; - values_buffer_len_ = page_size_; - values_buffer_ = parent_->reusable_col_mem_pool_->Allocate(values_buffer_len_); - } - NewPage(); - } - - // Now that the value has been successfully written, write the definition level. - bool ret = def_levels_->Put(value != nullptr); - // Writing the def level will succeed because we ensured there was enough space for it - // above, and new pages will always have space for at least a single def level. - DCHECK(ret); - - ++current_page_->header.data_page_header.num_values; - return Status::OK(); -} - -inline void HdfsParquetTableWriter::BaseColumnWriter::WriteDictDataPage() { - DCHECK(dict_encoder_base_ != nullptr); - DCHECK_EQ(current_page_->header.uncompressed_page_size, 0); - if (current_page_->num_non_null == 0) return; - int len = dict_encoder_base_->WriteData(values_buffer_, values_buffer_len_); - while (UNLIKELY(len < 0)) { - // len < 0 indicates the data doesn't fit into a data page. Allocate a larger data - // page. - values_buffer_len_ *= 2; - values_buffer_ = parent_->reusable_col_mem_pool_->Allocate(values_buffer_len_); - len = dict_encoder_base_->WriteData(values_buffer_, values_buffer_len_); - } - dict_encoder_base_->ClearIndices(); - current_page_->header.uncompressed_page_size = len; -} - -Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos, - int64_t* first_data_page, int64_t* first_dictionary_page) { - if (current_page_ == nullptr) { - // This column/file is empty - *first_data_page = *file_pos; - *first_dictionary_page = -1; - return Status::OK(); - } - - RETURN_IF_ERROR(FinalizeCurrentPage()); - - *first_dictionary_page = -1; - // First write the dictionary page before any of the data pages. - if (dict_encoder_base_ != nullptr) { - *first_dictionary_page = *file_pos; - // Write dictionary page header - parquet::DictionaryPageHeader dict_header; - dict_header.num_values = dict_encoder_base_->num_entries(); - dict_header.encoding = parquet::Encoding::PLAIN_DICTIONARY; - ++dict_encoding_stats_[dict_header.encoding]; - - parquet::PageHeader header; - header.type = parquet::PageType::DICTIONARY_PAGE; - header.uncompressed_page_size = dict_encoder_base_->dict_encoded_size(); - header.__set_dictionary_page_header(dict_header); - - // Write the dictionary page data, compressing it if necessary. - uint8_t* dict_buffer = parent_->per_file_mem_pool_->Allocate( - header.uncompressed_page_size); - dict_encoder_base_->WriteDict(dict_buffer); - if (compressor_.get() != nullptr) { - SCOPED_TIMER(parent_->parent_->compress_timer()); - int64_t max_compressed_size = - compressor_->MaxOutputLen(header.uncompressed_page_size); - DCHECK_GT(max_compressed_size, 0); - uint8_t* compressed_data = - parent_->per_file_mem_pool_->Allocate(max_compressed_size); - header.compressed_page_size = max_compressed_size; - RETURN_IF_ERROR(compressor_->ProcessBlock32(true, header.uncompressed_page_size, - dict_buffer, &header.compressed_page_size, &compressed_data)); - dict_buffer = compressed_data; - // We allocated the output based on the guessed size, return the extra allocated - // bytes back to the mem pool. - parent_->per_file_mem_pool_->ReturnPartialAllocation( - max_compressed_size - header.compressed_page_size); - } else { - header.compressed_page_size = header.uncompressed_page_size; - } - - uint8_t* header_buffer; - uint32_t header_len; - RETURN_IF_ERROR(parent_->thrift_serializer_->SerializeToBuffer( - &header, &header_len, &header_buffer)); - RETURN_IF_ERROR(parent_->Write(header_buffer, header_len)); - *file_pos += header_len; - total_compressed_byte_size_ += header_len; - total_uncompressed_byte_size_ += header_len; - - RETURN_IF_ERROR(parent_->Write(dict_buffer, header.compressed_page_size)); - *file_pos += header.compressed_page_size; - total_compressed_byte_size_ += header.compressed_page_size; - total_uncompressed_byte_size_ += header.uncompressed_page_size; - } - - *first_data_page = *file_pos; - int64_t current_row_group_index = 0; - RETURN_IF_ERROR(ReserveOffsetIndex(pages_.size())); - - // Write data pages - for (const DataPage& page : pages_) { - parquet::PageLocation location; - - if (page.header.data_page_header.num_values == 0) { - // Skip empty pages - location.offset = -1; - location.compressed_page_size = 0; - location.first_row_index = -1; - AddLocationToOffsetIndex(location); - continue; - } - - location.offset = *file_pos; - location.first_row_index = current_row_group_index; - - // Write data page header - uint8_t* buffer = nullptr; - uint32_t len = 0; - RETURN_IF_ERROR( - parent_->thrift_serializer_->SerializeToBuffer(&page.header, &len, &buffer)); - RETURN_IF_ERROR(parent_->Write(buffer, len)); - *file_pos += len; - - // Note that the namings are confusing here: - // parquet::PageHeader::compressed_page_size is the compressed page size in bytes, as - // its name suggests. On the other hand, parquet::PageLocation::compressed_page_size - // also includes the size of the page header. - location.compressed_page_size = page.header.compressed_page_size + len; - AddLocationToOffsetIndex(location); - - // Write the page data - RETURN_IF_ERROR(parent_->Write(page.data, page.header.compressed_page_size)); - *file_pos += page.header.compressed_page_size; - current_row_group_index += page.header.data_page_header.num_values; - } - return Status::OK(); -} - -Status HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() { - DCHECK(current_page_ != nullptr); - if (current_page_->finalized) return Status::OK(); - - // If the entire page was NULL, encode it as PLAIN since there is no - // data anyway. We don't output a useless dictionary page and it works - // around a parquet MR bug (see IMPALA-759 for more details). - if (current_page_->num_non_null == 0) current_encoding_ = parquet::Encoding::PLAIN; - - if (current_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) WriteDictDataPage(); - - parquet::PageHeader& header = current_page_->header; - header.data_page_header.encoding = current_encoding_; - - // Accumulate encoding statistics - column_encodings_.insert(header.data_page_header.encoding); - ++data_encoding_stats_[header.data_page_header.encoding]; - - // Compute size of definition bits - def_levels_->Flush(); - current_page_->num_def_bytes = sizeof(int32_t) + def_levels_->len(); - header.uncompressed_page_size += current_page_->num_def_bytes; - - // At this point we know all the data for the data page. Combine them into one buffer. - uint8_t* uncompressed_data = nullptr; - if (compressor_.get() == nullptr) { - uncompressed_data = - parent_->per_file_mem_pool_->Allocate(header.uncompressed_page_size); - } else { - // We have compression. Combine into the staging buffer. - parent_->compression_staging_buffer_.resize( - header.uncompressed_page_size); - uncompressed_data = &parent_->compression_staging_buffer_[0]; - } - - BufferBuilder buffer(uncompressed_data, header.uncompressed_page_size); - - // Copy the definition (null) data - int num_def_level_bytes = def_levels_->len(); - - buffer.Append(num_def_level_bytes); - buffer.Append(def_levels_->buffer(), num_def_level_bytes); - // TODO: copy repetition data when we support nested types. - buffer.Append(values_buffer_, buffer.capacity() - buffer.size()); - - // Apply compression if necessary - if (compressor_.get() == nullptr) { - current_page_->data = uncompressed_data; - header.compressed_page_size = header.uncompressed_page_size; - } else { - SCOPED_TIMER(parent_->parent_->compress_timer()); - int64_t max_compressed_size = - compressor_->MaxOutputLen(header.uncompressed_page_size); - DCHECK_GT(max_compressed_size, 0); - uint8_t* compressed_data = parent_->per_file_mem_pool_->Allocate(max_compressed_size); - header.compressed_page_size = max_compressed_size; - RETURN_IF_ERROR(compressor_->ProcessBlock32(true, header.uncompressed_page_size, - uncompressed_data, &header.compressed_page_size, &compressed_data)); - current_page_->data = compressed_data; - - // We allocated the output based on the guessed size, return the extra allocated - // bytes back to the mem pool. - parent_->per_file_mem_pool_->ReturnPartialAllocation( - max_compressed_size - header.compressed_page_size); - } - - DCHECK(page_stats_base_ != nullptr); - RETURN_IF_ERROR(AddPageStatsToColumnIndex()); - - // Update row group statistics from page statistics. - DCHECK(row_group_stats_base_ != nullptr); - row_group_stats_base_->Merge(*page_stats_base_); - - // Add the size of the data page header - uint8_t* header_buffer; - uint32_t header_len = 0; - RETURN_IF_ERROR(parent_->thrift_serializer_->SerializeToBuffer( - ¤t_page_->header, &header_len, &header_buffer)); - - current_page_->finalized = true; - total_compressed_byte_size_ += header_len + header.compressed_page_size; - total_uncompressed_byte_size_ += header_len + header.uncompressed_page_size; - parent_->file_size_estimate_ += header_len + header.compressed_page_size; - def_levels_->Clear(); - return Status::OK(); -} - -void HdfsParquetTableWriter::BaseColumnWriter::NewPage() { - pages_.push_back(DataPage()); - current_page_ = &pages_.back(); - - parquet::DataPageHeader header; - header.num_values = 0; - // The code that populates the column chunk metadata's encodings field - // relies on these specific values for the definition/repetition level - // encodings. - header.definition_level_encoding = parquet::Encoding::RLE; - header.repetition_level_encoding = parquet::Encoding::RLE; - current_page_->header.__set_data_page_header(header); - current_encoding_ = next_page_encoding_; - current_page_->finalized = false; - current_page_->num_non_null = 0; - page_stats_base_->Reset(); -} - -HdfsParquetTableWriter::HdfsParquetTableWriter(HdfsTableSink* parent, RuntimeState* state, - OutputPartition* output, const HdfsPartitionDescriptor* part_desc, - const HdfsTableDescriptor* table_desc) - : HdfsTableWriter(parent, state, output, part_desc, table_desc), - thrift_serializer_(new ThriftSerializer(true)), - current_row_group_(nullptr), - row_count_(0), - file_size_limit_(0), - reusable_col_mem_pool_(new MemPool(parent_->mem_tracker())), - per_file_mem_pool_(new MemPool(parent_->mem_tracker())), - row_idx_(0) {} - -HdfsParquetTableWriter::~HdfsParquetTableWriter() { -} - -Status HdfsParquetTableWriter::Init() { - // Initialize file metadata - file_metadata_.version = PARQUET_CURRENT_VERSION; - - stringstream created_by; - created_by << "impala version " << GetDaemonBuildVersion() - << " (build " << GetDaemonBuildHash() << ")"; - file_metadata_.__set_created_by(created_by.str()); - - // Default to snappy compressed - THdfsCompression::type codec = THdfsCompression::SNAPPY; - - const TQueryOptions& query_options = state_->query_options(); - if (query_options.__isset.compression_codec) { - codec = query_options.compression_codec; - } - if (!(codec == THdfsCompression::NONE || - codec == THdfsCompression::GZIP || - codec == THdfsCompression::SNAPPY)) { - stringstream ss; - ss << "Invalid parquet compression codec " << Codec::GetCodecName(codec); - return Status(ss.str()); - } - - VLOG_FILE << "Using compression codec: " << codec; - - int num_cols = table_desc_->num_cols() - table_desc_->num_clustering_cols(); - // When opening files using the hdfsOpenFile() API, the maximum block size is limited to - // 2GB. - int64_t min_block_size = MinBlockSize(num_cols); - if (min_block_size >= numeric_limits<int32_t>::max()) { - stringstream ss; - return Status(Substitute("Minimum required block size must be less than 2GB " - "(currently $0), try reducing the number of non-partitioning columns in the " - "target table (currently $1).", - PrettyPrinter::Print(min_block_size, TUnit::BYTES), num_cols)); - } - - columns_.resize(num_cols); - // Initialize each column structure. - for (int i = 0; i < columns_.size(); ++i) { - BaseColumnWriter* writer = nullptr; - const ColumnType& type = output_expr_evals_[i]->root().type(); - switch (type.type) { - case TYPE_BOOLEAN: - writer = new BoolColumnWriter(this, output_expr_evals_[i], codec); - break; - case TYPE_TINYINT: - writer = new ColumnWriter<int8_t>(this, output_expr_evals_[i], codec); - break; - case TYPE_SMALLINT: - writer = new ColumnWriter<int16_t>(this, output_expr_evals_[i], codec); - break; - case TYPE_INT: - writer = new ColumnWriter<int32_t>(this, output_expr_evals_[i], codec); - break; - case TYPE_BIGINT: - writer = new ColumnWriter<int64_t>(this, output_expr_evals_[i], codec); - break; - case TYPE_FLOAT: - writer = new ColumnWriter<float>(this, output_expr_evals_[i], codec); - break; - case TYPE_DOUBLE: - writer = new ColumnWriter<double>(this, output_expr_evals_[i], codec); - break; - case TYPE_TIMESTAMP: - writer = new ColumnWriter<TimestampValue>( - this, output_expr_evals_[i], codec); - break; - case TYPE_VARCHAR: - case TYPE_STRING: - case TYPE_CHAR: - writer = new ColumnWriter<StringValue>(this, output_expr_evals_[i], codec); - break; - case TYPE_DECIMAL: - switch (output_expr_evals_[i]->root().type().GetByteSize()) { - case 4: - writer = new ColumnWriter<Decimal4Value>( - this, output_expr_evals_[i], codec); - break; - case 8: - writer = new ColumnWriter<Decimal8Value>( - this, output_expr_evals_[i], codec); - break; - case 16: - writer = new ColumnWriter<Decimal16Value>( - this, output_expr_evals_[i], codec); - break; - default: - DCHECK(false); - } - break; - default: - DCHECK(false); - } - columns_[i].reset(writer); - RETURN_IF_ERROR(columns_[i]->Init()); - } - RETURN_IF_ERROR(CreateSchema()); - return Status::OK(); -} - -Status HdfsParquetTableWriter::CreateSchema() { - int num_clustering_cols = table_desc_->num_clustering_cols(); - - // Create flattened tree with a single root. - file_metadata_.schema.resize(columns_.size() + 1); - file_metadata_.schema[0].__set_num_children(columns_.size()); - file_metadata_.schema[0].name = "schema"; - - for (int i = 0; i < columns_.size(); ++i) { - parquet::SchemaElement& node = file_metadata_.schema[i + 1]; - const ColumnType& type = output_expr_evals_[i]->root().type(); - node.name = table_desc_->col_descs()[i + num_clustering_cols].name(); - node.__set_type(ConvertInternalToParquetType(type.type)); - node.__set_repetition_type(parquet::FieldRepetitionType::OPTIONAL); - if (type.type == TYPE_DECIMAL) { - // This column is type decimal. Update the file metadata to include the - // additional fields: - // 1) converted_type: indicate this is really a decimal column. - // 2) type_length: the number of bytes used per decimal value in the data - // 3) precision/scale - node.__set_converted_type(parquet::ConvertedType::DECIMAL); - node.__set_type_length( - ParquetPlainEncoder::DecimalSize(output_expr_evals_[i]->root().type())); - node.__set_scale(output_expr_evals_[i]->root().type().scale); - node.__set_precision(output_expr_evals_[i]->root().type().precision); - } else if (type.type == TYPE_VARCHAR || type.type == TYPE_CHAR || - (type.type == TYPE_STRING && - state_->query_options().parquet_annotate_strings_utf8)) { - node.__set_converted_type(parquet::ConvertedType::UTF8); - } else if (type.type == TYPE_TINYINT) { - node.__set_converted_type(parquet::ConvertedType::INT_8); - } else if (type.type == TYPE_SMALLINT) { - node.__set_converted_type(parquet::ConvertedType::INT_16); - } else if (type.type == TYPE_INT) { - node.__set_converted_type(parquet::ConvertedType::INT_32); - } else if (type.type == TYPE_BIGINT) { - node.__set_converted_type(parquet::ConvertedType::INT_64); - } - } - - return Status::OK(); -} - -Status HdfsParquetTableWriter::AddRowGroup() { - if (current_row_group_ != nullptr) RETURN_IF_ERROR(FlushCurrentRowGroup()); - file_metadata_.row_groups.push_back(parquet::RowGroup()); - current_row_group_ = &file_metadata_.row_groups[file_metadata_.row_groups.size() - 1]; - - // Initialize new row group metadata. - int num_clustering_cols = table_desc_->num_clustering_cols(); - current_row_group_->columns.resize(columns_.size()); - for (int i = 0; i < columns_.size(); ++i) { - parquet::ColumnMetaData metadata; - metadata.type = ConvertInternalToParquetType(columns_[i]->type().type); - metadata.path_in_schema.push_back( - table_desc_->col_descs()[i + num_clustering_cols].name()); - metadata.codec = columns_[i]->GetParquetCodec(); - current_row_group_->columns[i].__set_meta_data(metadata); - } - - return Status::OK(); -} - -int64_t HdfsParquetTableWriter::MinBlockSize(int64_t num_file_cols) const { - // See file_size_limit_ calculation in InitNewFile(). - return 3 * DEFAULT_DATA_PAGE_SIZE * num_file_cols; -} - -uint64_t HdfsParquetTableWriter::default_block_size() const { - int64_t block_size; - if (state_->query_options().__isset.parquet_file_size && - state_->query_options().parquet_file_size > 0) { - // If the user specified a value explicitly, use it. InitNewFile() will verify that - // the actual file's block size is sufficient. - block_size = state_->query_options().parquet_file_size; - } else { - block_size = HDFS_BLOCK_SIZE; - // Blocks are usually HDFS_BLOCK_SIZE bytes, unless there are many columns, in - // which case a per-column minimum kicks in. - block_size = max(block_size, MinBlockSize(columns_.size())); - } - // HDFS does not like block sizes that are not aligned - return BitUtil::RoundUp(block_size, HDFS_BLOCK_ALIGNMENT); -} - -Status HdfsParquetTableWriter::InitNewFile() { - DCHECK(current_row_group_ == nullptr); - - per_file_mem_pool_->Clear(); - - // Get the file limit - file_size_limit_ = output_->block_size; - if (file_size_limit_ < HDFS_MIN_FILE_SIZE) { - stringstream ss; - ss << "Hdfs file size (" << file_size_limit_ << ") is too small."; - return Status(ss.str()); - } - - // We want to output HDFS files that are no more than file_size_limit_. If we - // go over the limit, HDFS will split the file into multiple blocks which - // is undesirable. If we are under the limit, we potentially end up with more - // files than necessary. Either way, it is not going to generate a invalid - // file. - // With arbitrary encoding schemes, it is not possible to know if appending - // a new row will push us over the limit until after encoding it. Rolling back - // a row can be tricky as well so instead we will stop the file when it is - // 2 * DEFAULT_DATA_PAGE_SIZE * num_cols short of the limit. e.g. 50 cols with 8K data - // pages, means we stop 800KB shy of the limit. - // Data pages calculate their size precisely when they are complete so having - // a two page buffer guarantees we will never go over (unless there are huge values - // that require increasing the page size). - // TODO: this should be made dynamic based on the size of rows seen so far. - // This would for example, let us account for very long string columns. - const int64_t num_cols = columns_.size(); - if (file_size_limit_ < MinBlockSize(num_cols)) { - stringstream ss; - ss << "Parquet file size " << file_size_limit_ << " bytes is too small for " - << "a table with " << num_cols << " non-partitioning columns. Set query option " - << "PARQUET_FILE_SIZE to at least " << MinBlockSize(num_cols) << "."; - return Status(ss.str()); - } - file_size_limit_ -= 2 * DEFAULT_DATA_PAGE_SIZE * columns_.size(); - DCHECK_GE(file_size_limit_, - static_cast<int64_t>(DEFAULT_DATA_PAGE_SIZE * columns_.size())); - file_pos_ = 0; - row_count_ = 0; - file_size_estimate_ = 0; - - file_metadata_.row_groups.clear(); - RETURN_IF_ERROR(AddRowGroup()); - RETURN_IF_ERROR(WriteFileHeader()); - - return Status::OK(); -} - -Status HdfsParquetTableWriter::AppendRows( - RowBatch* batch, const vector<int32_t>& row_group_indices, bool* new_file) { - SCOPED_TIMER(parent_->encode_timer()); - *new_file = false; - int limit; - if (row_group_indices.empty()) { - limit = batch->num_rows(); - } else { - limit = row_group_indices.size(); - } - - bool all_rows = row_group_indices.empty(); - for (; row_idx_ < limit;) { - TupleRow* current_row = all_rows ? - batch->GetRow(row_idx_) : batch->GetRow(row_group_indices[row_idx_]); - for (int j = 0; j < columns_.size(); ++j) { - RETURN_IF_ERROR(columns_[j]->AppendRow(current_row)); - } - ++row_idx_; - ++row_count_; - ++output_->num_rows; - - if (file_size_estimate_ > file_size_limit_) { - // This file is full. We need a new file. - *new_file = true; - return Status::OK(); - } - } - - // We exhausted the batch, so we materialize the statistics before releasing the memory. - for (unique_ptr<BaseColumnWriter>& column : columns_) { - RETURN_IF_ERROR(column->MaterializeStatsValues()); - } - - // Reset the row_idx_ when we exhaust the batch. We can exit before exhausting - // the batch if we run out of file space and will continue from the last index. - row_idx_ = 0; - return Status::OK(); -} - -Status HdfsParquetTableWriter::Finalize() { - SCOPED_TIMER(parent_->hdfs_write_timer()); - - // At this point we write out the rest of the file. We first update the file - // metadata, now that all the values have been seen. - file_metadata_.num_rows = row_count_; - - // Set the ordering used to write parquet statistics for columns in the file. - parquet::ColumnOrder col_order = parquet::ColumnOrder(); - col_order.__set_TYPE_ORDER(parquet::TypeDefinedOrder()); - file_metadata_.column_orders.assign(columns_.size(), col_order); - file_metadata_.__isset.column_orders = true; - - RETURN_IF_ERROR(FlushCurrentRowGroup()); - RETURN_IF_ERROR(WritePageIndex()); - for (auto& column : columns_) column->Reset(); - RETURN_IF_ERROR(WriteFileFooter()); - *stats_.mutable_parquet_stats() = parquet_dml_stats_; - COUNTER_ADD(parent_->rows_inserted_counter(), row_count_); - return Status::OK(); -} - -void HdfsParquetTableWriter::Close() { - // Release all accumulated memory - for (int i = 0; i < columns_.size(); ++i) { - columns_[i]->Close(); - } - reusable_col_mem_pool_->FreeAll(); - per_file_mem_pool_->FreeAll(); - compression_staging_buffer_.clear(); -} - -Status HdfsParquetTableWriter::WriteFileHeader() { - DCHECK_EQ(file_pos_, 0); - RETURN_IF_ERROR(Write(PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER))); - file_pos_ += sizeof(PARQUET_VERSION_NUMBER); - file_size_estimate_ += sizeof(PARQUET_VERSION_NUMBER); - return Status::OK(); -} - -Status HdfsParquetTableWriter::FlushCurrentRowGroup() { - if (current_row_group_ == nullptr) return Status::OK(); - - int num_clustering_cols = table_desc_->num_clustering_cols(); - for (int i = 0; i < columns_.size(); ++i) { - int64_t data_page_offset, dict_page_offset; - // Flush this column. This updates the final metadata sizes for this column. - RETURN_IF_ERROR(columns_[i]->Flush(&file_pos_, &data_page_offset, &dict_page_offset)); - DCHECK_GT(data_page_offset, 0); - - parquet::ColumnChunk& col_chunk = current_row_group_->columns[i]; - parquet::ColumnMetaData& col_metadata = col_chunk.meta_data; - col_metadata.data_page_offset = data_page_offset; - if (dict_page_offset >= 0) { - col_metadata.__set_dictionary_page_offset(dict_page_offset); - } - - BaseColumnWriter* col_writer = columns_[i].get(); - col_metadata.num_values = col_writer->num_values(); - col_metadata.total_uncompressed_size = col_writer->total_uncompressed_size(); - col_metadata.total_compressed_size = col_writer->total_compressed_size(); - current_row_group_->total_byte_size += col_writer->total_compressed_size(); - current_row_group_->num_rows = col_writer->num_values(); - current_row_group_->columns[i].file_offset = file_pos_; - const string& col_name = table_desc_->col_descs()[i + num_clustering_cols].name(); - google::protobuf::Map<string,int64>* column_size_map = - parquet_dml_stats_.mutable_per_column_size(); - (*column_size_map)[col_name] += col_writer->total_compressed_size(); - - // Write encodings and encoding stats for this column - col_metadata.encodings.clear(); - for (parquet::Encoding::type encoding : col_writer->column_encodings_) { - col_metadata.encodings.push_back(encoding); - } - - vector<parquet::PageEncodingStats> encoding_stats; - // Add dictionary page encoding stats - for (const auto& entry: col_writer->dict_encoding_stats_) { - parquet::PageEncodingStats dict_enc_stat; - dict_enc_stat.page_type = parquet::PageType::DICTIONARY_PAGE; - dict_enc_stat.encoding = entry.first; - dict_enc_stat.count = entry.second; - encoding_stats.push_back(dict_enc_stat); - } - // Add data page encoding stats - for (const auto& entry: col_writer->data_encoding_stats_) { - parquet::PageEncodingStats data_enc_stat; - data_enc_stat.page_type = parquet::PageType::DATA_PAGE; - data_enc_stat.encoding = entry.first; - data_enc_stat.count = entry.second; - encoding_stats.push_back(data_enc_stat); - } - col_metadata.__set_encoding_stats(encoding_stats); - - // Build column statistics and add them to the header. - col_writer->EncodeRowGroupStats(¤t_row_group_->columns[i].meta_data); - - // Since we don't supported complex schemas, all columns should have the same - // number of values. - DCHECK_EQ(current_row_group_->columns[0].meta_data.num_values, - col_writer->num_values()); - - // Metadata for this column is complete, write it out to file. The column metadata - // goes at the end so that when we have collocated files, the column data can be - // written without buffering. - uint8_t* buffer = nullptr; - uint32_t len = 0; - RETURN_IF_ERROR(thrift_serializer_->SerializeToBuffer( - ¤t_row_group_->columns[i], &len, &buffer)); - RETURN_IF_ERROR(Write(buffer, len)); - file_pos_ += len; - } - - // Populate RowGroup::sorting_columns with all columns specified by the Frontend. - for (int col_idx : parent_->sort_columns()) { - current_row_group_->sorting_columns.push_back(parquet::SortingColumn()); - parquet::SortingColumn& sorting_column = current_row_group_->sorting_columns.back(); - sorting_column.column_idx = col_idx; - sorting_column.descending = false; - sorting_column.nulls_first = false; - } - current_row_group_->__isset.sorting_columns = - !current_row_group_->sorting_columns.empty(); - - current_row_group_ = nullptr; - return Status::OK(); -} - -Status HdfsParquetTableWriter::WritePageIndex() { - if (!FLAGS_enable_parquet_page_index_writing_debug_only) return Status::OK(); - - // Currently Impala only write Parquet files with a single row group. The current - // page index logic depends on this behavior as it only keeps one row group's - // statistics in memory. - DCHECK_EQ(file_metadata_.row_groups.size(), 1); - - parquet::RowGroup* row_group = &(file_metadata_.row_groups[0]); - // Write out the column indexes. - for (int i = 0; i < columns_.size(); ++i) { - auto& column = *columns_[i]; - if (!column.valid_column_index_) continue; - column.column_index_.__set_boundary_order( - column.row_group_stats_base_->GetBoundaryOrder()); - // We always set null_counts. - column.column_index_.__isset.null_counts = true; - uint8_t* buffer = nullptr; - uint32_t len = 0; - RETURN_IF_ERROR(thrift_serializer_->SerializeToBuffer( - &column.column_index_, &len, &buffer)); - RETURN_IF_ERROR(Write(buffer, len)); - // Update the column_index_offset and column_index_length of the ColumnChunk - row_group->columns[i].__set_column_index_offset(file_pos_); - row_group->columns[i].__set_column_index_length(len); - file_pos_ += len; - } - // Write out the offset indexes. - for (int i = 0; i < columns_.size(); ++i) { - auto& column = *columns_[i]; - uint8_t* buffer = nullptr; - uint32_t len = 0; - RETURN_IF_ERROR(thrift_serializer_->SerializeToBuffer( - &column.offset_index_, &len, &buffer)); - RETURN_IF_ERROR(Write(buffer, len)); - // Update the offset_index_offset and offset_index_length of the ColumnChunk - row_group->columns[i].__set_offset_index_offset(file_pos_); - row_group->columns[i].__set_offset_index_length(len); - file_pos_ += len; - } - return Status::OK(); -} - -Status HdfsParquetTableWriter::WriteFileFooter() { - // Write file_meta_data - uint32_t file_metadata_len = 0; - uint8_t* buffer = nullptr; - RETURN_IF_ERROR(thrift_serializer_->SerializeToBuffer( - &file_metadata_, &file_metadata_len, &buffer)); - RETURN_IF_ERROR(Write(buffer, file_metadata_len)); - - // Write footer - RETURN_IF_ERROR(Write<uint32_t>(file_metadata_len)); - RETURN_IF_ERROR(Write(PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER))); - return Status::OK(); -}
