IMPALA-4734: Set parquet::RowGroup::sorting_columns This changes the HdfsParquetTableWriter to populate the parquet::RowGroup::sorting_columns list with all columns mentioned in a 'sortby()' hint within INSERT statements. The columns are added to the list in the order in which they appear inside the hint.
The change also adds backports.tempfile to the python requirements to provide 'tempfile.TemporaryDirectory' on python 2.7. The change also changes the default ordering for columns mentioned in 'sortby()' hints from descending to ascending. To test this change, we write a table with a 'sortby()' hint and verify, that the sorting_columns get populated correctly. Change-Id: Ib42aab585e9e627796e9510e783652d49d74b56c Reviewed-on: http://gerrit.cloudera.org:8080/6219 Reviewed-by: Lars Volker <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/768fc0ea Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/768fc0ea Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/768fc0ea Branch: refs/heads/master Commit: 768fc0ea2773289b88256ea16090c0cfcf2d0a97 Parents: 5d306ef Author: Lars Volker <[email protected]> Authored: Mon Feb 27 11:21:40 2017 -0800 Committer: Impala Public Jenkins <[email protected]> Committed: Tue Mar 7 09:07:05 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-parquet-table-writer.cc | 16 +++++++-- be/src/exec/hdfs-parquet-table-writer.h | 2 +- be/src/exec/hdfs-table-sink.cc | 1 + be/src/exec/hdfs-table-sink.h | 7 +++- common/thrift/DataSinks.thrift | 5 +++ .../org/apache/impala/analysis/DeleteStmt.java | 3 +- .../org/apache/impala/analysis/InsertStmt.java | 9 ++++- .../org/apache/impala/analysis/UpdateStmt.java | 3 +- .../apache/impala/planner/HdfsTableSink.java | 10 +++++- .../java/org/apache/impala/planner/Planner.java | 2 +- .../org/apache/impala/planner/TableSink.java | 14 ++++++-- infra/python/deps/requirements.txt | 1 + .../queries/PlannerTest/insert.test | 24 ++++++------- .../queries/PlannerTest/kudu-upsert.test | 2 +- .../queries/PlannerTest/kudu.test | 8 ++--- tests/query_test/test_insert_parquet.py | 38 ++++++++++++++++++++ 16 files changed, 117 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/be/src/exec/hdfs-parquet-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc index 164e921..5c2d24c 100644 --- a/be/src/exec/hdfs-parquet-table-writer.cc +++ b/be/src/exec/hdfs-parquet-table-writer.cc @@ -998,7 +998,7 @@ Status HdfsParquetTableWriter::Finalize() { file_metadata_.num_rows = row_count_; RETURN_IF_ERROR(FlushCurrentRowGroup()); RETURN_IF_ERROR(WriteFileFooter()); - stats_.__set_parquet_stats(parquet_stats_); + stats_.__set_parquet_stats(parquet_insert_stats_); COUNTER_ADD(parent_->rows_inserted_counter(), row_count_); return Status::OK(); } @@ -1046,7 +1046,8 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() { current_row_group_->num_rows = col_writer->num_values(); current_row_group_->columns[i].file_offset = file_pos_; const string& col_name = table_desc_->col_descs()[i + num_clustering_cols].name(); - parquet_stats_.per_column_size[col_name] += col_writer->total_compressed_size(); + parquet_insert_stats_.per_column_size[col_name] += + col_writer->total_compressed_size(); // Write encodings and encoding stats for this column col_metadata.encodings.clear(); @@ -1094,6 +1095,17 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() { col_writer->Reset(); } + // Populate RowGroup::sorting_columns with all columns specified by the Frontend. + for (int col_idx : parent_->sort_by_columns()) { + current_row_group_->sorting_columns.push_back(parquet::SortingColumn()); + parquet::SortingColumn& sorting_column = current_row_group_->sorting_columns.back(); + sorting_column.column_idx = col_idx; + sorting_column.descending = false; + sorting_column.nulls_first = false; + } + current_row_group_->__isset.sorting_columns = + !current_row_group_->sorting_columns.empty(); + current_row_group_ = nullptr; return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/be/src/exec/hdfs-parquet-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-table-writer.h b/be/src/exec/hdfs-parquet-table-writer.h index d4fbd94..8cbd456 100644 --- a/be/src/exec/hdfs-parquet-table-writer.h +++ b/be/src/exec/hdfs-parquet-table-writer.h @@ -187,7 +187,7 @@ class HdfsParquetTableWriter : public HdfsTableWriter { std::vector<uint8_t> compression_staging_buffer_; /// For each column, the on disk size written. - TParquetInsertStats parquet_stats_; + TParquetInsertStats parquet_insert_stats_; }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/be/src/exec/hdfs-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index 8d64683..4aa09cd 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -69,6 +69,7 @@ HdfsTableSink::HdfsTableSink(const RowDescriptor& row_desc, partition_key_texprs_(tsink.table_sink.hdfs_table_sink.partition_key_exprs), overwrite_(tsink.table_sink.hdfs_table_sink.overwrite), input_is_clustered_(tsink.table_sink.hdfs_table_sink.input_is_clustered), + sort_by_columns_(tsink.table_sink .hdfs_table_sink.sort_by_columns), current_clustered_partition_(nullptr) { DCHECK(tsink.__isset.table_sink); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/be/src/exec/hdfs-table-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h index 65c1798..45d64c5 100644 --- a/be/src/exec/hdfs-table-sink.h +++ b/be/src/exec/hdfs-table-sink.h @@ -152,7 +152,7 @@ class HdfsTableSink : public DataSink { virtual void Close(RuntimeState* state); int skip_header_line_count() const { return skip_header_line_count_; } - + const vector<int32_t>& sort_by_columns() const { return sort_by_columns_; } const HdfsTableDescriptor& TableDesc() { return *table_desc_; } RuntimeProfile::Counter* rows_inserted_counter() { return rows_inserted_counter_; } @@ -270,6 +270,11 @@ class HdfsTableSink : public DataSink { /// be opened, written, and closed one by one. bool input_is_clustered_; + // Stores the indices into the list of non-clustering columns of the target table that + // are mentioned in the 'sortby()' hint. This is used in the backend to populate the + // RowGroup::sorting_columns list in parquet files. + const std::vector<int32_t>& sort_by_columns_; + /// Stores the current partition during clustered inserts across subsequent row batches. /// Only set if 'input_is_clustered_' is true. PartitionPair* current_clustered_partition_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/common/thrift/DataSinks.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift index 0b136b2..6e3224e 100644 --- a/common/thrift/DataSinks.thrift +++ b/common/thrift/DataSinks.thrift @@ -70,6 +70,11 @@ struct THdfsTableSink { // This property indicates to the table sink whether the input is ordered by the // partition keys, meaning partitions can be opened, written, and closed one by one. 4: required bool input_is_clustered + + // Stores the indices into the list of non-clustering columns of the target table that + // are mentioned in the 'sortby()' hint. This is used in the backend to populate the + // RowGroup::sorting_columns list in parquet files. + 5: optional list<i32> sort_by_columns } // Structure to encapsulate specific options that are passed down to the KuduTableSink http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java index 2f7f670..52d58a7 100644 --- a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java @@ -56,7 +56,8 @@ public class DeleteStmt extends ModifyStmt { // analyze() must have been called before. Preconditions.checkState(table_ != null); TableSink tableSink = TableSink.create(table_, TableSink.Op.DELETE, - ImmutableList.<Expr>of(), referencedColumns_, false, false); + ImmutableList.<Expr>of(), referencedColumns_, false, false, + ImmutableList.<Integer>of()); Preconditions.checkState(!referencedColumns_.isEmpty()); return tableSink; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java index 916f97f..b7172da 100644 --- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java @@ -130,6 +130,11 @@ public class InsertStmt extends StatementBase { // contain primary key columns. private List<Expr> sortByExprs_ = Lists.newArrayList(); + // Stores the indices into the list of non-clustering columns of the target table that + // are mentioned in the 'sortby()' hint. This is sent to the backend to populate the + // RowGroup::sorting_columns list in parquet files. + private List<Integer> sortByColumns_ = Lists.newArrayList(); + // Output expressions that produce the final results to write to the target table. May // include casts. Set in prepareExpressions(). // If this is an INSERT on a non-Kudu table, it will contain one Expr for all @@ -804,6 +809,7 @@ public class InsertStmt extends StatementBase { for (int i = 0; i < columns.size(); ++i) { if (columns.get(i).getName().equals(columnName)) { sortByExprs_.add(resultExprs_.get(i)); + sortByColumns_.add(i); foundColumn = true; break; } @@ -854,7 +860,8 @@ public class InsertStmt extends StatementBase { // analyze() must have been called before. Preconditions.checkState(table_ != null); return TableSink.create(table_, isUpsert_ ? TableSink.Op.UPSERT : TableSink.Op.INSERT, - partitionKeyExprs_, mentionedColumns_, overwrite_, hasClusteredHint_); + partitionKeyExprs_, mentionedColumns_, overwrite_, hasClusteredHint_, + sortByColumns_); } /** http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java index de74bd8..ddce618 100644 --- a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java @@ -65,7 +65,8 @@ public class UpdateStmt extends ModifyStmt { // analyze() must have been called before. Preconditions.checkState(table_ != null); DataSink dataSink = TableSink.create(table_, TableSink.Op.UPDATE, - ImmutableList.<Expr>of(), referencedColumns_, false, false); + ImmutableList.<Expr>of(), referencedColumns_, false, false, + ImmutableList.<Integer>of()); Preconditions.checkState(!referencedColumns_.isEmpty()); return dataSink; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java index fc7f9b1..996f981 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java @@ -32,6 +32,7 @@ import org.apache.impala.thrift.THdfsTableSink; import org.apache.impala.thrift.TTableSink; import org.apache.impala.thrift.TTableSinkType; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; /** * Base class for Hdfs data sinks such as HdfsTextTableSink. @@ -50,13 +51,19 @@ public class HdfsTableSink extends TableSink { // be opened, written, and closed one by one. protected final boolean inputIsClustered_; + // Stores the indices into the list of non-clustering columns of the target table that + // are mentioned in the 'sortby()' hint. This is sent to the backend to populate the + // RowGroup::sorting_columns list in parquet files. + private List<Integer> sortByColumns_ = Lists.newArrayList(); + public HdfsTableSink(Table targetTable, List<Expr> partitionKeyExprs, - boolean overwrite, boolean inputIsClustered) { + boolean overwrite, boolean inputIsClustered, List<Integer> sortByColumns) { super(targetTable, Op.INSERT); Preconditions.checkState(targetTable instanceof HdfsTable); partitionKeyExprs_ = partitionKeyExprs; overwrite_ = overwrite; inputIsClustered_ = inputIsClustered; + sortByColumns_ = sortByColumns; } @Override @@ -154,6 +161,7 @@ public class HdfsTableSink extends TableSink { if (skipHeaderLineCount > 0) { hdfsTableSink.setSkip_header_line_count(skipHeaderLineCount); } + hdfsTableSink.setSort_by_columns(sortByColumns_); TTableSink tTableSink = new TTableSink(DescriptorTable.TABLE_SINK_ID, TTableSinkType.HDFS, sinkOp_.toThrift()); tTableSink.hdfs_table_sink = hdfsTableSink; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/fe/src/main/java/org/apache/impala/planner/Planner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index cc8b39b..8842c9c 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -548,7 +548,7 @@ public class Planner { if (orderingExprs.isEmpty()) return; // Build sortinfo to sort by the ordering exprs. - List<Boolean> isAscOrder = Collections.nCopies(orderingExprs.size(), false); + List<Boolean> isAscOrder = Collections.nCopies(orderingExprs.size(), true); List<Boolean> nullsFirstParams = Collections.nCopies(orderingExprs.size(), false); SortInfo sortInfo = new SortInfo(orderingExprs, isAscOrder, nullsFirstParams); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/fe/src/main/java/org/apache/impala/planner/TableSink.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java b/fe/src/main/java/org/apache/impala/planner/TableSink.java index fb3cea2..8595eea 100644 --- a/fe/src/main/java/org/apache/impala/planner/TableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java @@ -87,16 +87,22 @@ public abstract class TableSink extends DataSink { * Not all Ops are supported for all tables. * All parameters must be non-null, the lists in particular need to be empty if they * don't make sense for a certain table type. + * For HDFS tables 'sortByColumns' specifies the indices into the list of non-clustering + * columns of the target table that are mentioned in the 'sortby()' hint. */ public static TableSink create(Table table, Op sinkAction, List<Expr> partitionKeyExprs, List<Integer> referencedColumns, - boolean overwrite, boolean inputIsClustered) { + boolean overwrite, boolean inputIsClustered, List<Integer> sortByColumns) { + Preconditions.checkNotNull(partitionKeyExprs); + Preconditions.checkNotNull(referencedColumns); + Preconditions.checkNotNull(sortByColumns); if (table instanceof HdfsTable) { // Hdfs only supports inserts. Preconditions.checkState(sinkAction == Op.INSERT); // Referenced columns don't make sense for an Hdfs table. Preconditions.checkState(referencedColumns.isEmpty()); - return new HdfsTableSink(table, partitionKeyExprs, overwrite, inputIsClustered); + return new HdfsTableSink(table, partitionKeyExprs, overwrite, inputIsClustered, + sortByColumns); } else if (table instanceof HBaseTable) { // HBase only supports inserts. Preconditions.checkState(sinkAction == Op.INSERT); @@ -106,6 +112,8 @@ public abstract class TableSink extends DataSink { Preconditions.checkState(overwrite == false); // Referenced columns don't make sense for an HBase table. Preconditions.checkState(referencedColumns.isEmpty()); + // sortby() hint is not supported for HBase tables. + Preconditions.checkState(sortByColumns.isEmpty()); // Create the HBaseTableSink and return it. return new HBaseTableSink(table); } else if (table instanceof KuduTable) { @@ -113,6 +121,8 @@ public abstract class TableSink extends DataSink { Preconditions.checkState(overwrite == false); // Partition clauses don't make sense for Kudu inserts. Preconditions.checkState(partitionKeyExprs.isEmpty()); + // sortby() hint is not supported for Kudu tables. + Preconditions.checkState(sortByColumns.isEmpty()); return new KuduTableSink(table, sinkAction, referencedColumns); } else { throw new UnsupportedOperationException( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/infra/python/deps/requirements.txt ---------------------------------------------------------------------- diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt index 7d9d484..c068c83 100644 --- a/infra/python/deps/requirements.txt +++ b/infra/python/deps/requirements.txt @@ -23,6 +23,7 @@ # multiple times (though maybe they could be). allpairs == 2.0.1 +backports.tempfile == 1.0rc1 boto3 == 1.2.3 simplejson == 3.3.0 # For python version 2.6 botocore == 1.3.30 http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/testdata/workloads/functional-planner/queries/PlannerTest/insert.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test index 8e3adb9..5f54f86 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/insert.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/insert.test @@ -578,7 +578,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month) | partitions=24 | 01:SORT -| order by: year DESC NULLS LAST, month DESC NULLS LAST +| order by: year ASC NULLS LAST, month ASC NULLS LAST | 00:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB @@ -587,7 +587,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month) | partitions=24 | 02:SORT -| order by: year DESC NULLS LAST, month DESC NULLS LAST +| order by: year ASC NULLS LAST, month ASC NULLS LAST | 01:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)] | @@ -602,7 +602,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month) | partitions=24 | 01:SORT -| order by: year DESC NULLS LAST, month DESC NULLS LAST +| order by: year ASC NULLS LAST, month ASC NULLS LAST | 00:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB @@ -611,7 +611,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month) | partitions=24 | 01:SORT -| order by: year DESC NULLS LAST, month DESC NULLS LAST +| order by: year ASC NULLS LAST, month ASC NULLS LAST | 00:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB @@ -626,7 +626,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month) | partitions=24 | 04:SORT -| order by: year DESC NULLS LAST, month DESC NULLS LAST +| order by: year ASC NULLS LAST, month ASC NULLS LAST | 03:HASH JOIN [LEFT SEMI JOIN] | hash predicates: int_col = max(int_col) @@ -646,7 +646,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month) | partitions=24 | 08:SORT -| order by: year DESC NULLS LAST, month DESC NULLS LAST +| order by: year ASC NULLS LAST, month ASC NULLS LAST | 07:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)] | @@ -714,7 +714,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month) | partitions=24 | 01:SORT -| order by: int_col DESC NULLS LAST, bool_col DESC NULLS LAST +| order by: int_col ASC NULLS LAST, bool_col ASC NULLS LAST | 00:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB @@ -723,7 +723,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month) | partitions=24 | 02:SORT -| order by: int_col DESC NULLS LAST, bool_col DESC NULLS LAST +| order by: int_col ASC NULLS LAST, bool_col ASC NULLS LAST | 01:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)] | @@ -739,7 +739,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month) | partitions=24 | 01:SORT -| order by: int_col DESC NULLS LAST, bool_col DESC NULLS LAST +| order by: int_col ASC NULLS LAST, bool_col ASC NULLS LAST | 00:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB @@ -748,7 +748,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month) | partitions=24 | 01:SORT -| order by: int_col DESC NULLS LAST, bool_col DESC NULLS LAST +| order by: int_col ASC NULLS LAST, bool_col ASC NULLS LAST | 00:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB @@ -762,7 +762,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month) | partitions=24 | 01:SORT -| order by: year DESC NULLS LAST, month DESC NULLS LAST, int_col DESC NULLS LAST, bool_col DESC NULLS LAST +| order by: year ASC NULLS LAST, month ASC NULLS LAST, int_col ASC NULLS LAST, bool_col ASC NULLS LAST | 00:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB @@ -771,7 +771,7 @@ WRITE TO HDFS [functional.alltypes, OVERWRITE=false, PARTITION-KEYS=(year,month) | partitions=24 | 02:SORT -| order by: year DESC NULLS LAST, month DESC NULLS LAST, int_col DESC NULLS LAST, bool_col DESC NULLS LAST +| order by: year ASC NULLS LAST, month ASC NULLS LAST, int_col ASC NULLS LAST, bool_col ASC NULLS LAST | 01:EXCHANGE [HASH(functional.alltypes.year,functional.alltypes.month)] | http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test index b106b02..bbcb014 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-upsert.test @@ -86,7 +86,7 @@ select * from functional_kudu.testtbl UPSERT INTO KUDU [functional_kudu.testtbl] | 01:SORT -| order by: id DESC NULLS LAST +| order by: id ASC NULLS LAST | 00:SCAN KUDU [functional_kudu.testtbl] ==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test index 1549ec7..6b5291a 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test @@ -225,14 +225,14 @@ select * from functional_kudu.alltypes INSERT INTO KUDU [functional_kudu.alltypes] | 01:SORT -| order by: id DESC NULLS LAST +| order by: id ASC NULLS LAST | 00:SCAN KUDU [functional_kudu.alltypes] ---- DISTRIBUTEDPLAN INSERT INTO KUDU [functional_kudu.alltypes] | 01:SORT -| order by: id DESC NULLS LAST +| order by: id ASC NULLS LAST | 00:SCAN KUDU [functional_kudu.alltypes] ==== @@ -247,7 +247,7 @@ from functional_kudu.testtbl group by id, name INSERT INTO KUDU [functional_kudu.testtbl] | 02:SORT -| order by: id DESC NULLS LAST +| order by: id ASC NULLS LAST | 01:AGGREGATE [FINALIZE] | output: max(zip) @@ -258,7 +258,7 @@ INSERT INTO KUDU [functional_kudu.testtbl] INSERT INTO KUDU [functional_kudu.testtbl] | 04:SORT -| order by: id DESC NULLS LAST +| order by: id ASC NULLS LAST | 03:AGGREGATE [FINALIZE] | output: max:merge(zip) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/768fc0ea/tests/query_test/test_insert_parquet.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py index be2704d..cca5827 100644 --- a/tests/query_test/test_insert_parquet.py +++ b/tests/query_test/test_insert_parquet.py @@ -23,6 +23,8 @@ from collections import namedtuple from shutil import rmtree from subprocess import check_call from tempfile import mkdtemp as make_tmp_dir +from backports.tempfile import TemporaryDirectory +from parquet.ttypes import SortingColumn from tests.common.environ import impalad_basedir from tests.common.impala_test_suite import ImpalaTestSuite @@ -213,6 +215,42 @@ class TestHdfsParquetTableWriter(ImpalaTestSuite): self.execute_query("drop table %s" % qualified_table_name) rmtree(tmp_dir) + def test_sorting_columns(self, vector, unique_database): + """Tests that RowGroup::sorting_columns gets populated when specifying a sortby() + insert hint.""" + source_table = "functional_parquet.alltypessmall" + target_table = "test_write_sorting_columns" + qualified_target_table = "{0}.{1}".format(unique_database, target_table) + hdfs_path = get_fs_path("/test-warehouse/{0}.db/{1}/".format(unique_database, + target_table)) + + # Create table + # TODO: Simplify once IMPALA-4167 (insert hints in CTAS) has been fixed. + query = "create table {0} like {1} stored as parquet".format(qualified_target_table, + source_table) + self.execute_query(query) + + # Insert data + query = ("insert into {0} partition(year, month) /* +sortby(int_col, id) */ " + "select * from {1}").format(qualified_target_table, source_table) + self.execute_query(query) + + # Download hdfs files and extract rowgroup metadata + row_groups = [] + with TemporaryDirectory() as tmp_dir: + check_call(['hdfs', 'dfs', '-get', hdfs_path, tmp_dir]) + + for root, subdirs, files in os.walk(tmp_dir): + for f in files: + parquet_file = os.path.join(root, str(f)) + file_meta_data = get_parquet_metadata(parquet_file) + row_groups.extend(file_meta_data.row_groups) + + # Verify that the files have the sorted_columns set + expected = [SortingColumn(4, False, False), SortingColumn(0, False, False)] + for row_group in row_groups: + assert row_group.sorting_columns == expected + @SkipIfIsilon.hive @SkipIfLocal.hive
