HIVE-11886 : LLAP: merge master into branch (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f324305a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f324305a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f324305a Branch: refs/heads/master Commit: f324305a71ac31faa568b8a0078b1e9b217a3570 Parents: 79c7031 e9c8d7c Author: Sergey Shelukhin <[email protected]> Authored: Fri Sep 18 13:35:36 2015 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Fri Sep 18 13:35:36 2015 -0700 ---------------------------------------------------------------------- .../benchmark/serde/LazySimpleSerDeBench.java | 453 ++++ .../hive/ql/security/FolderPermissionBase.java | 17 +- .../test/resources/testconfiguration.properties | 2 + .../org/apache/hadoop/hive/ql/QTestUtil.java | 9 +- .../llap/io/encoded/OrcEncodedDataReader.java | 3 +- .../hadoop/hive/metastore/HiveMetaStore.java | 6 + pom.xml | 22 +- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 269 +- .../apache/hadoop/hive/ql/exec/MoveTask.java | 19 +- .../ql/exec/persistence/PTFRowContainer.java | 14 +- .../hive/ql/exec/persistence/RowContainer.java | 12 +- .../ql/exec/tez/tools/KeyValuesInputMerger.java | 1 - .../ql/exec/vector/VectorizationContext.java | 10 +- .../hadoop/hive/ql/hooks/LineageLogger.java | 95 +- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 150 +- .../apache/hadoop/hive/ql/io/orc/OrcSerde.java | 1 + .../apache/hadoop/hive/ql/io/orc/OrcStruct.java | 2 +- .../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 4 +- .../hive/ql/io/parquet/ProjectionPusher.java | 3 +- .../hive/ql/io/sarg/ConvertAstToSearchArg.java | 4 + .../apache/hadoop/hive/ql/lib/RuleRegExp.java | 61 +- .../ql/optimizer/ColumnPrunerProcFactory.java | 3 + .../hive/ql/optimizer/ConvertJoinMapJoin.java | 4 +- .../calcite/reloperators/HiveBetween.java | 75 + .../optimizer/calcite/reloperators/HiveIn.java | 41 + .../rules/HiveAggregateProjectMergeRule.java | 151 ++ .../calcite/rules/HivePreFilteringRule.java | 37 +- .../calcite/rules/HiveRelFieldTrimmer.java | 145 +- .../translator/PlanModifierForASTConv.java | 4 +- .../translator/SqlFunctionConverter.java | 16 +- .../hive/ql/optimizer/lineage/LineageCtx.java | 8 +- .../hadoop/hive/ql/parse/CalcitePlanner.java | 11 +- .../hive/ql/parse/DDLSemanticAnalyzer.java | 17 + .../apache/hadoop/hive/ql/parse/HiveParser.g | 7 +- .../apache/hadoop/hive/ql/parse/QBSubQuery.java | 7 - .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 2 + .../hive/ql/parse/SemanticAnalyzerFactory.java | 2 + .../hadoop/hive/ql/parse/SubQueryUtils.java | 11 - .../org/apache/hadoop/hive/ql/plan/DDLWork.java | 21 + .../hadoop/hive/ql/plan/HiveOperation.java | 1 + .../hive/ql/plan/ShowCreateDatabaseDesc.java | 94 + .../authorization/plugin/HiveOperationType.java | 1 + .../plugin/sqlstd/Operation2Privilege.java | 2 + .../org/apache/hadoop/hive/ql/udf/UDFJson.java | 2 + .../hive/ql/udf/generic/GenericUDAFMax.java | 16 +- .../exec/persistence/TestPTFRowContainer.java | 31 +- .../hadoop/hive/ql/io/orc/TestOrcStruct.java | 2 + .../clientpositive/drop_table_with_index.q | 35 + .../queries/clientpositive/exchgpartition2lel.q | 32 + ql/src/test/queries/clientpositive/lineage3.q | 26 + .../test/queries/clientpositive/load_orc_part.q | 5 + .../clientpositive/show_create_database.q | 3 + .../queries/clientpositive/subquery_views.q | 22 +- .../queries/clientpositive/vector_char_cast.q | 9 + .../queries/clientpositive/windowing_udaf.q | 4 + .../subquery_exists_implicit_gby.q.out | 8 +- .../subquery_nested_subquery.q.out | 4 +- .../subquery_notexists_implicit_gby.q.out | 8 +- .../subquery_windowing_corr.q.out | 7 +- .../alter_partition_coltype.q.out | 8 +- .../clientpositive/annotate_stats_groupby.q.out | 106 +- .../annotate_stats_groupby2.q.out | 28 +- .../results/clientpositive/auto_join18.q.out | 12 +- .../auto_join18_multi_distinct.q.out | 12 +- .../results/clientpositive/auto_join27.q.out | 18 +- .../results/clientpositive/auto_join32.q.out | 4 +- .../clientpositive/binarysortable_1.q.out | Bin 4329 -> 4325 bytes .../clientpositive/correlationoptimizer2.q.out | 220 +- .../clientpositive/correlationoptimizer6.q.out | 232 +- ql/src/test/results/clientpositive/count.q.out | 14 +- .../results/clientpositive/ctas_colname.q.out | 52 +- .../test/results/clientpositive/database.q.out | 2 +- .../clientpositive/decimal_precision.q.out | 4 +- .../results/clientpositive/decimal_udf.q.out | 30 +- .../results/clientpositive/distinct_stats.q.out | 14 +- .../clientpositive/drop_table_with_index.q.out | 152 ++ .../dynpart_sort_opt_vectorization.q.out | 105 +- .../dynpart_sort_optimization.q.out | 105 +- ...ryption_select_read_only_encrypted_tbl.q.out | 4 +- .../clientpositive/exchgpartition2lel.q.out | 182 ++ .../clientpositive/explain_logical.q.out | 78 +- .../clientpositive/fetch_aggregation.q.out | 4 +- .../test/results/clientpositive/gby_star.q.out | 54 +- .../test/results/clientpositive/groupby12.q.out | 6 +- .../results/clientpositive/groupby5_map.q.out | 4 +- .../clientpositive/groupby5_map_skew.q.out | 4 +- .../results/clientpositive/groupby_cube1.q.out | 12 +- .../groupby_distinct_samekey.q.out | 6 +- .../clientpositive/groupby_grouping_sets2.q.out | 10 +- .../clientpositive/groupby_grouping_sets3.q.out | 12 +- .../clientpositive/groupby_grouping_sets5.q.out | 8 +- .../clientpositive/groupby_grouping_sets6.q.out | 8 +- .../clientpositive/groupby_position.q.out | 36 +- .../clientpositive/groupby_resolution.q.out | 60 +- .../clientpositive/groupby_rollup1.q.out | 12 +- .../clientpositive/groupby_sort_10.q.out | 8 +- .../clientpositive/groupby_sort_11.q.out | 10 +- .../results/clientpositive/groupby_sort_8.q.out | 12 +- ql/src/test/results/clientpositive/having.q.out | 62 +- .../test/results/clientpositive/having2.q.out | 12 +- .../clientpositive/index_auto_mult_tables.q.out | 12 +- .../clientpositive/index_auto_self_join.q.out | 12 +- .../clientpositive/index_auto_update.q.out | 6 +- .../index_bitmap_auto_partitioned.q.out | 6 +- .../index_bitmap_compression.q.out | 6 +- .../infer_bucket_sort_dyn_part.q.out | 4 +- .../infer_bucket_sort_map_operators.q.out | 4 +- ql/src/test/results/clientpositive/join18.q.out | 12 +- .../clientpositive/join18_multi_distinct.q.out | 12 +- ql/src/test/results/clientpositive/join31.q.out | 36 +- .../limit_partition_metadataonly.q.out | 4 +- .../results/clientpositive/limit_pushdown.q.out | 36 +- .../test/results/clientpositive/lineage2.q.out | 2 +- .../test/results/clientpositive/lineage3.q.out | 72 +- .../list_bucket_query_multiskew_3.q.out | 2 +- .../results/clientpositive/load_orc_part.q.out | 26 + .../clientpositive/mapjoin_mapjoin.q.out | 32 +- .../clientpositive/metadata_only_queries.q.out | 4 +- .../results/clientpositive/metadataonly1.q.out | 112 +- .../results/clientpositive/multiMapJoin2.q.out | 226 +- .../nonblock_op_deduplicate.q.out | 8 +- .../results/clientpositive/nonmr_fetch.q.out | 14 +- .../clientpositive/partition_multilevels.q.out | 8 +- .../test/results/clientpositive/ppd_gby.q.out | 12 +- .../test/results/clientpositive/ppd_gby2.q.out | 60 +- .../clientpositive/ppd_join_filter.q.out | 98 +- .../ql_rewrite_gbtoidx_cbo_1.q.out | 168 +- .../ql_rewrite_gbtoidx_cbo_2.q.out | 94 +- .../reduce_deduplicate_extended.q.out | 32 +- .../clientpositive/selectDistinctStar.q.out | 44 +- .../clientpositive/show_create_database.q.out | 19 + .../clientpositive/spark/auto_join18.q.out | 10 +- .../spark/auto_join18_multi_distinct.q.out | 12 +- .../clientpositive/spark/auto_join27.q.out | 18 +- .../clientpositive/spark/auto_join32.q.out | 53 +- .../results/clientpositive/spark/count.q.out | 14 +- .../clientpositive/spark/groupby5_map.q.out | 4 +- .../spark/groupby5_map_skew.q.out | 4 +- .../clientpositive/spark/groupby_cube1.q.out | 12 +- .../clientpositive/spark/groupby_position.q.out | 18 +- .../spark/groupby_resolution.q.out | 60 +- .../clientpositive/spark/groupby_rollup1.q.out | 12 +- .../results/clientpositive/spark/having.q.out | 62 +- .../spark/infer_bucket_sort_map_operators.q.out | 4 +- .../results/clientpositive/spark/join18.q.out | 10 +- .../spark/join18_multi_distinct.q.out | 12 +- .../results/clientpositive/spark/join31.q.out | 36 +- .../spark/limit_partition_metadataonly.q.out | 4 +- .../clientpositive/spark/limit_pushdown.q.out | 34 +- .../clientpositive/spark/mapjoin_mapjoin.q.out | 24 +- .../spark/metadata_only_queries.q.out | 4 +- .../clientpositive/spark/ppd_join_filter.q.out | 90 +- .../spark/ql_rewrite_gbtoidx_cbo_1.q.out | 168 +- .../clientpositive/spark/stats_only_null.q.out | 8 +- .../clientpositive/spark/subquery_in.q.out | 36 +- .../results/clientpositive/spark/union11.q.out | 42 +- .../results/clientpositive/spark/union14.q.out | 28 +- .../results/clientpositive/spark/union15.q.out | 28 +- .../results/clientpositive/spark/union28.q.out | 4 +- .../results/clientpositive/spark/union30.q.out | 4 +- .../results/clientpositive/spark/union33.q.out | 8 +- .../results/clientpositive/spark/union5.q.out | 34 +- .../results/clientpositive/spark/union7.q.out | 28 +- .../clientpositive/spark/union_remove_21.q.out | 4 +- .../spark/vector_count_distinct.q.out | 4 +- .../spark/vector_decimal_aggregate.q.out | 12 +- .../spark/vector_distinct_2.q.out | 28 +- .../clientpositive/spark/vector_groupby_3.q.out | 30 +- .../spark/vector_mapjoin_reduce.q.out | 36 +- .../clientpositive/spark/vector_orderby_5.q.out | 6 +- .../clientpositive/spark/vectorization_0.q.out | 16 +- .../clientpositive/spark/vectorization_13.q.out | 32 +- .../clientpositive/spark/vectorization_15.q.out | 16 +- .../clientpositive/spark/vectorization_16.q.out | 16 +- .../clientpositive/spark/vectorization_9.q.out | 16 +- .../spark/vectorization_pushdown.q.out | 4 +- .../spark/vectorization_short_regress.q.out | 74 +- .../spark/vectorized_nested_mapjoin.q.out | 17 +- .../spark/vectorized_timestamp_funcs.q.out | 12 +- .../clientpositive/stats_only_null.q.out | 8 +- .../results/clientpositive/stats_ppr_all.q.out | 16 +- .../subq_where_serialization.q.out | 18 +- .../clientpositive/subquery_exists_having.q.out | 48 +- .../results/clientpositive/subquery_in.q.out | 36 +- .../clientpositive/subquery_in_having.q.out | 260 +- .../clientpositive/subquery_notexists.q.out | 18 +- .../subquery_notexists_having.q.out | 26 +- .../results/clientpositive/subquery_notin.q.out | 24 +- .../subquery_notin_having.q.java1.7.out | 50 +- .../subquery_unqualcolumnrefs.q.out | 74 +- .../results/clientpositive/subquery_views.q.out | 124 +- .../test/results/clientpositive/tez/count.q.out | 14 +- .../tez/dynamic_partition_pruning.q.out | 88 +- .../tez/dynpart_sort_opt_vectorization.q.out | 90 +- .../tez/dynpart_sort_optimization.q.out | 89 +- .../clientpositive/tez/explainuser_1.q.out | 2319 +++++++++--------- .../clientpositive/tez/explainuser_2.q.out | 782 +++--- .../results/clientpositive/tez/having.q.out | 62 +- .../clientpositive/tez/limit_pushdown.q.out | 34 +- .../clientpositive/tez/mapjoin_mapjoin.q.out | 24 +- .../tez/metadata_only_queries.q.out | 4 +- .../clientpositive/tez/metadataonly1.q.out | 44 +- .../test/results/clientpositive/tez/mrr.q.out | 94 +- .../clientpositive/tez/selectDistinctStar.q.out | 44 +- .../tez/show_create_database.q.out | 19 + .../clientpositive/tez/stats_only_null.q.out | 8 +- .../clientpositive/tez/subquery_in.q.out | 36 +- .../results/clientpositive/tez/tez_dml.q.out | 6 +- .../results/clientpositive/tez/union5.q.out | 44 +- .../results/clientpositive/tez/union7.q.out | 28 +- .../clientpositive/tez/unionDistinct_1.q.out | 8 +- .../clientpositive/tez/vector_aggregate_9.q.out | 4 +- .../tez/vector_binary_join_groupby.q.out | 4 +- .../clientpositive/tez/vector_char_cast.q.out | 35 + .../tez/vector_count_distinct.q.out | 4 +- .../tez/vector_decimal_aggregate.q.out | 12 +- .../tez/vector_decimal_precision.q.out | 4 +- .../clientpositive/tez/vector_decimal_udf.q.out | 30 +- .../clientpositive/tez/vector_distinct_2.q.out | 28 +- .../clientpositive/tez/vector_groupby_3.q.out | 30 +- .../tez/vector_groupby_reduce.q.out | 8 +- .../tez/vector_grouping_sets.q.out | 8 +- .../tez/vector_mapjoin_reduce.q.out | 36 +- .../clientpositive/tez/vector_orderby_5.q.out | 6 +- .../clientpositive/tez/vector_outer_join2.q.out | 20 +- .../tez/vector_partition_diff_num_cols.q.out | 20 +- .../tez/vector_partitioned_date_time.q.out | 12 +- .../tez/vector_reduce_groupby_decimal.q.out | 24 +- .../clientpositive/tez/vectorization_0.q.out | 16 +- .../clientpositive/tez/vectorization_13.q.out | 32 +- .../clientpositive/tez/vectorization_15.q.out | 16 +- .../clientpositive/tez/vectorization_16.q.out | 16 +- .../clientpositive/tez/vectorization_9.q.out | 16 +- .../tez/vectorization_limit.q.out | 14 +- .../tez/vectorization_pushdown.q.out | 4 +- .../tez/vectorization_short_regress.q.out | 74 +- .../tez/vectorized_distinct_gby.q.out | 8 +- .../vectorized_dynamic_partition_pruning.q.out | 88 +- .../tez/vectorized_nested_mapjoin.q.out | 18 +- .../clientpositive/tez/vectorized_parquet.q.out | 6 +- .../tez/vectorized_timestamp_funcs.q.out | 12 +- ql/src/test/results/clientpositive/udf8.q.out | 4 +- .../test/results/clientpositive/udf_count.q.out | 16 +- .../test/results/clientpositive/union11.q.out | 70 +- .../test/results/clientpositive/union14.q.out | 32 +- .../test/results/clientpositive/union15.q.out | 38 +- .../test/results/clientpositive/union28.q.out | 8 +- .../test/results/clientpositive/union30.q.out | 8 +- .../test/results/clientpositive/union33.q.out | 8 +- ql/src/test/results/clientpositive/union5.q.out | 48 +- ql/src/test/results/clientpositive/union7.q.out | 32 +- .../clientpositive/unionDistinct_1.q.out | 8 +- .../clientpositive/union_remove_21.q.out | 8 +- .../clientpositive/vector_aggregate_9.q.out | 4 +- .../vector_aggregate_without_gby.q.out | 4 +- .../vector_binary_join_groupby.q.out | 4 +- .../clientpositive/vector_char_cast.q.out | 35 + .../clientpositive/vector_count_distinct.q.out | 6 +- .../vector_decimal_aggregate.q.out | 12 +- .../vector_decimal_precision.q.out | 4 +- .../clientpositive/vector_decimal_udf.q.out | 30 +- .../clientpositive/vector_distinct_2.q.out | 28 +- .../clientpositive/vector_groupby_3.q.out | 30 +- .../clientpositive/vector_groupby_reduce.q.out | 8 +- .../clientpositive/vector_grouping_sets.q.out | 8 +- .../clientpositive/vector_left_outer_join.q.out | 8 +- .../clientpositive/vector_mapjoin_reduce.q.out | 36 +- .../clientpositive/vector_orderby_5.q.out | 6 +- .../clientpositive/vector_outer_join1.q.out | 8 +- .../clientpositive/vector_outer_join2.q.out | 28 +- .../clientpositive/vector_outer_join3.q.out | 24 +- .../clientpositive/vector_outer_join4.q.out | 8 +- .../clientpositive/vector_outer_join5.q.out | 48 +- .../vector_partition_diff_num_cols.q.out | 20 +- .../vector_partitioned_date_time.q.out | 12 +- .../vector_reduce_groupby_decimal.q.out | 24 +- .../clientpositive/vectorization_0.q.out | 16 +- .../clientpositive/vectorization_13.q.out | 32 +- .../clientpositive/vectorization_15.q.out | 16 +- .../clientpositive/vectorization_16.q.out | 16 +- .../clientpositive/vectorization_9.q.out | 16 +- .../clientpositive/vectorization_limit.q.out | 16 +- .../clientpositive/vectorization_pushdown.q.out | 4 +- .../vectorization_short_regress.q.out | 74 +- .../vectorized_distinct_gby.q.out | 12 +- .../vectorized_nested_mapjoin.q.out | 26 +- .../clientpositive/vectorized_parquet.q.out | 6 +- .../vectorized_parquet_types.q.out | 6 +- .../vectorized_timestamp_funcs.q.out | 12 +- .../results/clientpositive/windowing_udaf.q.out | 12 + .../hive/serde2/ColumnProjectionUtils.java | 22 + .../hadoop/hive/serde2/lazy/LazyByte.java | 4 + .../hadoop/hive/serde2/lazy/LazyDouble.java | 4 + .../hadoop/hive/serde2/lazy/LazyFloat.java | 4 + .../hadoop/hive/serde2/lazy/LazyInteger.java | 4 + .../hadoop/hive/serde2/lazy/LazyLong.java | 4 + .../hadoop/hive/serde2/lazy/LazyShort.java | 4 + .../hadoop/hive/serde2/lazy/LazyUtils.java | 28 + .../org/apache/hive/service/cli/Column.java | 2 +- .../org/apache/hive/service/cli/TestColumn.java | 129 + .../hive/ql/io/sarg/SearchArgumentFactory.java | 5 +- .../hive/ql/io/sarg/SearchArgumentImpl.java | 7 +- 302 files changed, 7055 insertions(+), 5389 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java ---------------------------------------------------------------------- diff --cc llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 4f7bb78,0000000..c934f39 mode 100644,000000..100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@@ -1,949 -1,0 +1,950 @@@ +package org.apache.hadoop.hive.llap.io.encoded; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.CallableWithNdc; +import org.apache.hadoop.hive.common.Pool; +import org.apache.hadoop.hive.common.Pool.PoolObjectHelper; +import org.apache.hadoop.hive.common.io.DataCache; +import org.apache.hadoop.hive.common.io.Allocator; +import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData; +import org.apache.hadoop.hive.common.io.DiskRange; +import org.apache.hadoop.hive.common.io.DiskRangeList; +import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.ConsumerFeedback; +import org.apache.hadoop.hive.llap.DebugUtils; +import org.apache.hadoop.hive.llap.cache.Cache; +import org.apache.hadoop.hive.llap.cache.LowLevelCache; +import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters.Counter; +import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer; +import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; +import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache; +import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; +import org.apache.hadoop.hive.ql.exec.DDLTask; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.HdfsUtils; +import org.apache.hadoop.hive.ql.io.orc.CompressionKind; +import org.apache.hadoop.hive.ql.io.orc.DataReader; +import org.apache.hadoop.hive.ql.io.orc.MetadataReader; +import org.apache.hadoop.hive.ql.io.orc.OrcFile; +import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions; +import org.apache.hadoop.hive.ql.io.orc.OrcConf; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcProto; +import org.apache.hadoop.hive.ql.io.orc.OrcSplit; +import org.apache.hadoop.hive.ql.io.orc.encoded.Reader; +import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; +import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier; +import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; +import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedOrcFile; +import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReader; +import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey; +import org.apache.hadoop.hive.ql.io.orc.encoded.OrcCacheKey; +import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; +import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory; +import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils; +import org.apache.hadoop.hive.ql.io.orc.StripeInformation; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hive.common.util.FixedSizedObjectPool; + +/** + * This produces EncodedColumnBatch via ORC EncodedDataImpl. + * It serves as Consumer for EncodedColumnBatch too, for the high-level cache scenario where + * it inserts itself into the pipeline to put the data in cache, before passing it to the real + * consumer. It also serves as ConsumerFeedback that receives processed EncodedColumnBatch-es. + */ +public class OrcEncodedDataReader extends CallableWithNdc<Void> + implements ConsumerFeedback<OrcEncodedColumnBatch>, Consumer<OrcEncodedColumnBatch> { + private static final Log LOG = LogFactory.getLog(OrcEncodedDataReader.class); + public static final FixedSizedObjectPool<ColumnStreamData> CSD_POOL = + new FixedSizedObjectPool<>(8192, new PoolObjectHelper<ColumnStreamData>() { + @Override + public ColumnStreamData create() { + return new ColumnStreamData(); + } + @Override + public void resetBeforeOffer(ColumnStreamData t) { + t.reset(); + } + }); + public static final FixedSizedObjectPool<OrcEncodedColumnBatch> ECB_POOL = + new FixedSizedObjectPool<>(1024, new PoolObjectHelper<OrcEncodedColumnBatch>() { + @Override + public OrcEncodedColumnBatch create() { + return new OrcEncodedColumnBatch(); + } + @Override + public void resetBeforeOffer(OrcEncodedColumnBatch t) { + t.reset(); + } + }); + private static final PoolFactory POOL_FACTORY = new PoolFactory() { + @Override + public <T> Pool<T> createPool(int size, PoolObjectHelper<T> helper) { + return new FixedSizedObjectPool<>(size, helper); + } + + @Override + public Pool<ColumnStreamData> createColumnStreamDataPool() { + return CSD_POOL; + } + + @Override + public Pool<OrcEncodedColumnBatch> createEncodedColumnBatchPool() { + return ECB_POOL; + } + }; + + private final OrcMetadataCache metadataCache; + private final LowLevelCache lowLevelCache; + private final Configuration conf; + private final Cache<OrcCacheKey> cache; + private final FileSplit split; + private List<Integer> columnIds; + private final SearchArgument sarg; + private final String[] columnNames; + private final OrcEncodedDataConsumer consumer; + private final QueryFragmentCounters counters; + + // Read state. + private int stripeIxFrom; + private OrcFileMetadata fileMetadata; + private Reader orcReader; + private MetadataReader metadataReader; + private EncodedReader stripeReader; + private long fileId; + private FileSystem fs; + /** + * readState[stripeIx'][colIx'] => boolean array (could be a bitmask) of rg-s that need to be + * read. Contains only stripes that are read, and only columns included. null => read all RGs. + */ + private boolean[][][] readState; + private volatile boolean isStopped = false; + @SuppressWarnings("unused") + private volatile boolean isPaused = false; + + public OrcEncodedDataReader(LowLevelCache lowLevelCache, Cache<OrcCacheKey> cache, + OrcMetadataCache metadataCache, Configuration conf, InputSplit split, + List<Integer> columnIds, SearchArgument sarg, String[] columnNames, + OrcEncodedDataConsumer consumer, QueryFragmentCounters counters) { + this.lowLevelCache = lowLevelCache; + this.metadataCache = metadataCache; + this.cache = cache; + this.conf = conf; + this.split = (FileSplit)split; + this.columnIds = columnIds; + if (this.columnIds != null) { + Collections.sort(this.columnIds); + } + this.sarg = sarg; + this.columnNames = columnNames; + this.consumer = consumer; + this.counters = counters; + } + + @Override + public void stop() { + if (LOG.isInfoEnabled()) { + LOG.info("Encoded reader is being stopped"); + } + isStopped = true; + } + + @Override + public void pause() { + isPaused = true; + // TODO: pause fetching + } + + @Override + public void unpause() { + isPaused = false; + // TODO: unpause fetching + } + + @Override + protected Void callInternal() throws IOException { + long startTime = counters.startTimeCounter(); + if (LlapIoImpl.LOGL.isInfoEnabled()) { + LlapIoImpl.LOG.info("Processing data for " + split.getPath()); + } + if (processStop()) { + recordReaderTime(startTime); + return null; + } + counters.setDesc(QueryFragmentCounters.Desc.TABLE, getDbAndTableName(split.getPath())); + orcReader = null; + // 1. Get file metadata from cache, or create the reader and read it. + // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that + fs = split.getPath().getFileSystem(conf); + fileId = determineFileId(fs, split); + counters.setDesc(QueryFragmentCounters.Desc.FILE, fileId); + + try { + fileMetadata = getOrReadFileMetadata(); + consumer.setFileMetadata(fileMetadata); + validateFileMetadata(); + if (columnIds == null) { + columnIds = createColumnIds(fileMetadata); + } + + // 2. Determine which stripes to read based on the split. + determineStripesToRead(); + } catch (Throwable t) { + recordReaderTime(startTime); + consumer.setError(t); + return null; + } + + if (readState.length == 0) { + consumer.setDone(); + recordReaderTime(startTime); + return null; // No data to read. + } + counters.setDesc(QueryFragmentCounters.Desc.STRIPES, stripeIxFrom + "," + readState.length); + + // 3. Apply SARG if needed, and otherwise determine what RGs to read. + int stride = fileMetadata.getRowIndexStride(); + ArrayList<OrcStripeMetadata> stripeMetadatas = null; + boolean[] globalIncludes = null; + boolean[] sargColumns = null; + try { + globalIncludes = OrcInputFormat.genIncludedColumns(fileMetadata.getTypes(), columnIds, true); + if (sarg != null && stride != 0) { + // TODO: move this to a common method - int[] filterColumns = RecordReaderImpl.mapSargColumns(sarg.getLeaves(), columnNames, 0); ++ int[] filterColumns = RecordReaderImpl.mapSargColumnsToOrcInternalColIdx( ++ sarg.getLeaves(), columnNames, 0); + // included will not be null, row options will fill the array with trues if null + sargColumns = new boolean[globalIncludes.length]; + for (int i : filterColumns) { + // filter columns may have -1 as index which could be partition column in SARG. + if (i > 0) { + sargColumns[i] = true; + } + } + + // If SARG is present, get relevant stripe metadata from cache or readers. + stripeMetadatas = readStripesMetadata(globalIncludes, sargColumns); + } + + // Now, apply SARG if any; w/o sarg, this will just initialize readState. + boolean hasData = determineRgsToRead(globalIncludes, stride, stripeMetadatas); + if (!hasData) { + consumer.setDone(); + recordReaderTime(startTime); + return null; // No data to read. + } + } catch (Throwable t) { + cleanupReaders(); + consumer.setError(t); + recordReaderTime(startTime); + return null; + } + + if (processStop()) { + cleanupReaders(); + recordReaderTime(startTime); + return null; + } + + // 4. Get data from high-level cache. + // If some cols are fully in cache, this will also give us the modified list of columns to + // read for every stripe (null means read all of them - the usual path). In any case, + // readState will be modified for column x rgs that were fetched from high-level cache. + List<Integer>[] stripeColsToRead = null; + if (cache != null) { + try { + stripeColsToRead = produceDataFromCache(stride); + } catch (Throwable t) { + // produceDataFromCache handles its own cleanup. + consumer.setError(t); + cleanupReaders(); + recordReaderTime(startTime); + return null; + } + } + + // 5. Create encoded data reader. + // In case if we have high-level cache, we will intercept the data and add it there; + // otherwise just pass the data directly to the consumer. + Consumer<OrcEncodedColumnBatch> dataConsumer = (cache == null) ? this.consumer : this; + try { + ensureOrcReader(); + // Reader creating updates HDFS counters, don't do it here. + DataWrapperForOrc dw = new DataWrapperForOrc(); + stripeReader = orcReader.encodedReader(fileId, dw, dw, POOL_FACTORY); + stripeReader.setDebugTracing(DebugUtils.isTraceOrcEnabled()); + } catch (Throwable t) { + consumer.setError(t); + recordReaderTime(startTime); + cleanupReaders(); + return null; + } + + // 6. Read data. + // TODO: I/O threadpool could be here - one thread per stripe; for now, linear. + OrcBatchKey stripeKey = new OrcBatchKey(fileId, -1, 0); + for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) { + if (processStop()) { + cleanupReaders(); + recordReaderTime(startTime); + return null; + } + int stripeIx = stripeIxFrom + stripeIxMod; + boolean[][] colRgs = null; + boolean[] stripeIncludes = null; + OrcStripeMetadata stripeMetadata = null; + StripeInformation stripe; + try { + List<Integer> cols = stripeColsToRead == null ? null : stripeColsToRead[stripeIxMod]; + if (cols != null && cols.isEmpty()) continue; // No need to read this stripe. + stripe = fileMetadata.getStripes().get(stripeIx); + + if (DebugUtils.isTraceOrcEnabled()) { + LlapIoImpl.LOG.info("Reading stripe " + stripeIx + ": " + + stripe.getOffset() + ", " + stripe.getLength()); + } + colRgs = readState[stripeIxMod]; + // We assume that NO_RGS value is only set from SARG filter and for all columns; + // intermediate changes for individual columns will unset values in the array. + // Skip this case for 0-column read. We could probably special-case it just like we do + // in EncodedReaderImpl, but for now it's not that important. + if (colRgs.length > 0 && colRgs[0] == SargApplier.READ_NO_RGS) continue; + + // 6.1. Determine the columns to read (usually the same as requested). + if (cache == null || cols == null || cols.size() == colRgs.length) { + cols = columnIds; + stripeIncludes = globalIncludes; + } else { + // We are reading subset of the original columns, remove unnecessary bitmasks/etc. + // This will never happen w/o high-level cache. + stripeIncludes = OrcInputFormat.genIncludedColumns(fileMetadata.getTypes(), cols, true); + colRgs = genStripeColRgs(cols, colRgs); + } + + // 6.2. Ensure we have stripe metadata. We might have read it before for RG filtering. + boolean isFoundInCache = false; + if (stripeMetadatas != null) { + stripeMetadata = stripeMetadatas.get(stripeIxMod); + } else { + stripeKey.stripeIx = stripeIx; + stripeMetadata = metadataCache.getStripeMetadata(stripeKey); + isFoundInCache = (stripeMetadata != null); + if (!isFoundInCache) { + counters.incrCounter(Counter.METADATA_CACHE_MISS); + ensureMetadataReader(); + long startTimeHdfs = counters.startTimeCounter(); + stripeMetadata = new OrcStripeMetadata( + stripeKey, metadataReader, stripe, stripeIncludes, sargColumns); + counters.incrTimeCounter(Counter.HDFS_TIME_US, startTimeHdfs); + stripeMetadata = metadataCache.putStripeMetadata(stripeMetadata); + if (DebugUtils.isTraceOrcEnabled()) { + LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx + + " metadata with includes: " + DebugUtils.toString(stripeIncludes)); + } + stripeKey = new OrcBatchKey(fileId, -1, 0); + } + consumer.setStripeMetadata(stripeMetadata); + } + if (!stripeMetadata.hasAllIndexes(stripeIncludes)) { + if (DebugUtils.isTraceOrcEnabled()) { + LlapIoImpl.LOG.info("Updating indexes in stripe " + stripeKey.stripeIx + + " metadata for includes: " + DebugUtils.toString(stripeIncludes)); + } + assert isFoundInCache; + counters.incrCounter(Counter.METADATA_CACHE_MISS); + ensureMetadataReader(); + updateLoadedIndexes(stripeMetadata, stripe, stripeIncludes, sargColumns); + } else if (isFoundInCache) { + counters.incrCounter(Counter.METADATA_CACHE_HIT); + } + } catch (Throwable t) { + consumer.setError(t); + cleanupReaders(); + recordReaderTime(startTime); + return null; + } + if (processStop()) { + cleanupReaders(); + recordReaderTime(startTime); + return null; + } + + // 6.3. Finally, hand off to the stripe reader to produce the data. + // This is a sync call that will feed data to the consumer. + try { + // TODO: readEncodedColumns is not supposed to throw; errors should be propagated thru + // consumer. It is potentially holding locked buffers, and must perform its own cleanup. + // Also, currently readEncodedColumns is not stoppable. The consumer will discard the + // data it receives for one stripe. We could probably interrupt it, if it checked that. + stripeReader.readEncodedColumns(stripeIx, stripe, stripeMetadata.getRowIndexes(), + stripeMetadata.getEncodings(), stripeMetadata.getStreams(), stripeIncludes, + colRgs, dataConsumer); + } catch (Throwable t) { + consumer.setError(t); + cleanupReaders(); + recordReaderTime(startTime); + return null; + } + } + + // Done with all the things. + recordReaderTime(startTime); + dataConsumer.setDone(); + if (DebugUtils.isTraceMttEnabled()) { + LlapIoImpl.LOG.info("done processing " + split); + } + + // Close the stripe reader, we are done reading. + cleanupReaders(); + return null; + } + + private void recordReaderTime(long startTime) { + counters.incrTimeCounter(Counter.TOTAL_IO_TIME_US, startTime); + } + + private static String getDbAndTableName(Path path) { + // Ideally, we'd get this from split; however, split doesn't contain any such thing and it's + // actually pretty hard to get cause even split generator only uses paths. We only need this + // for metrics; therefore, brace for BLACK MAGIC! + String[] parts = path.toUri().getPath().toString().split(Path.SEPARATOR); + int dbIx = -1; + // Try to find the default db postfix; don't check two last components - at least there + // should be a table and file (we could also try to throw away partition/bucket/acid stuff). + for (int i = 0; i < parts.length - 2; ++i) { + if (!parts[i].endsWith(DDLTask.DATABASE_PATH_SUFFIX)) continue; + if (dbIx >= 0) { + dbIx = -1; // Let's not guess. + break; + } + dbIx = i; + } + if (dbIx >= 0) { + return parts[dbIx].substring(0, parts[dbIx].length() - 3) + "." + parts[dbIx + 1]; + } + + // Just go from the back and throw away everything we think is wrong; skip last item, the file. + boolean isInPartFields = false; + for (int i = parts.length - 2; i >= 0; --i) { + String p = parts[i]; + boolean isPartField = p.contains("="); + if ((isInPartFields && !isPartField) || (!isPartField && !p.startsWith(AcidUtils.BASE_PREFIX) + && !p.startsWith(AcidUtils.DELTA_PREFIX) && !p.startsWith(AcidUtils.BUCKET_PREFIX))) { + dbIx = i - 1; + break; + } + isInPartFields = isPartField; + } + // If we found something before we ran out of components, use it. + if (dbIx >= 0) { + String dbName = parts[dbIx]; + if (dbName.endsWith(DDLTask.DATABASE_PATH_SUFFIX)) { + dbName = dbName.substring(0, dbName.length() - 3); + } + return dbName + "." + parts[dbIx + 1]; + } + return "unknown"; + } + + private void validateFileMetadata() throws IOException { + if (fileMetadata.getCompressionKind() == CompressionKind.NONE) return; + int bufferSize = fileMetadata.getCompressionBufferSize(); + int minAllocSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC); + if (bufferSize < minAllocSize) { + LOG.warn("ORC compression buffer size (" + bufferSize + ") is smaller than LLAP low-level " + + "cache minimum allocation size (" + minAllocSize + "). Decrease the value for " + + HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.toString() + " to avoid wasting memory"); + } + } + + private boolean processStop() { + if (!isStopped) return false; + LOG.info("Encoded data reader is stopping"); + cleanupReaders(); + return true; + } + + private static long determineFileId(FileSystem fs, FileSplit split) throws IOException { + if (split instanceof OrcSplit) { + Long fileId = ((OrcSplit)split).getFileId(); + if (fileId != null) { + return fileId; + } + } + LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID"); + return HdfsUtils.getFileId(fs, split.getPath()); + } + + private boolean[][] genStripeColRgs(List<Integer> stripeCols, boolean[][] globalColRgs) { + boolean[][] stripeColRgs = new boolean[stripeCols.size()][]; + for (int i = 0, i2 = -1; i < globalColRgs.length; ++i) { + if (globalColRgs[i] == null) continue; + stripeColRgs[i2] = globalColRgs[i]; + ++i2; + } + return stripeColRgs; + } + + /** + * Puts all column indexes from metadata to make a column list to read all column. + */ + private static List<Integer> createColumnIds(OrcFileMetadata metadata) { + List<Integer> columnIds = new ArrayList<Integer>(metadata.getTypes().size()); + for (int i = 1; i < metadata.getTypes().size(); ++i) { + columnIds.add(i); + } + return columnIds; + } + + /** + * In case if stripe metadata in cache does not have all indexes for current query, load + * the missing one. This is a temporary cludge until real metadata cache becomes available. + */ + private void updateLoadedIndexes(OrcStripeMetadata stripeMetadata, + StripeInformation stripe, boolean[] stripeIncludes, boolean[] sargColumns) throws IOException { + // We only synchronize on write for now - design of metadata cache is very temporary; + // we pre-allocate the array and never remove entries; so readers should be safe. + synchronized (stripeMetadata) { + if (stripeMetadata.hasAllIndexes(stripeIncludes)) return; + long startTime = counters.startTimeCounter(); + stripeMetadata.loadMissingIndexes(metadataReader, stripe, stripeIncludes, sargColumns); + counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime); + } + } + + /** + * Closes the stripe readers (on error). + */ + private void cleanupReaders() { + if (metadataReader != null) { + try { + metadataReader.close(); + } catch (IOException ex) { + // Ignore. + } + } + if (stripeReader != null) { + try { + stripeReader.close(); + } catch (IOException ex) { + // Ignore. + } + } + } + + /** + * Ensures orcReader is initialized for the split. + */ + private void ensureOrcReader() throws IOException { + if (orcReader != null) return; + Path path = HdfsUtils.getFileIdPath(fs, split.getPath(), fileId); + if (DebugUtils.isTraceOrcEnabled()) { + LOG.info("Creating reader for " + path + " (" + split.getPath() + ")"); + } + long startTime = counters.startTimeCounter(); + ReaderOptions opts = OrcFile.readerOptions(conf).filesystem(fs).fileMetadata(fileMetadata); + orcReader = EncodedOrcFile.createReader(path, opts); + counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime); + } + + /** + * Gets file metadata for the split from cache, or reads it from the file. + */ + private OrcFileMetadata getOrReadFileMetadata() throws IOException { + OrcFileMetadata metadata = metadataCache.getFileMetadata(fileId); + if (metadata != null) { + counters.incrCounter(Counter.METADATA_CACHE_HIT); + return metadata; + } + counters.incrCounter(Counter.METADATA_CACHE_MISS); + ensureOrcReader(); + // We assume this call doesn't touch HDFS because everything is already read; don't add time. + metadata = new OrcFileMetadata(fileId, orcReader); + return metadataCache.putFileMetadata(metadata); + } + + /** + * Reads the metadata for all stripes in the file. + */ + private ArrayList<OrcStripeMetadata> readStripesMetadata( + boolean[] globalInc, boolean[] sargColumns) throws IOException { + ArrayList<OrcStripeMetadata> result = new ArrayList<OrcStripeMetadata>(readState.length); + OrcBatchKey stripeKey = new OrcBatchKey(fileId, 0, 0); + for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) { + stripeKey.stripeIx = stripeIxMod + stripeIxFrom; + OrcStripeMetadata value = metadataCache.getStripeMetadata(stripeKey); + if (value == null || !value.hasAllIndexes(globalInc)) { + counters.incrCounter(Counter.METADATA_CACHE_MISS); + ensureMetadataReader(); + StripeInformation si = fileMetadata.getStripes().get(stripeKey.stripeIx); + if (value == null) { + long startTime = counters.startTimeCounter(); + value = new OrcStripeMetadata(stripeKey, metadataReader, si, globalInc, sargColumns); + counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime); + value = metadataCache.putStripeMetadata(value); + if (DebugUtils.isTraceOrcEnabled()) { + LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx + + " metadata with includes: " + DebugUtils.toString(globalInc)); + } + // Create new key object to reuse for gets; we've used the old one to put in cache. + stripeKey = new OrcBatchKey(fileId, 0, 0); + } + // We might have got an old value from cache; recheck it has indexes. + if (!value.hasAllIndexes(globalInc)) { + if (DebugUtils.isTraceOrcEnabled()) { + LlapIoImpl.LOG.info("Updating indexes in stripe " + stripeKey.stripeIx + + " metadata for includes: " + DebugUtils.toString(globalInc)); + } + updateLoadedIndexes(value, si, globalInc, sargColumns); + } + } else { + counters.incrCounter(Counter.METADATA_CACHE_HIT); + } + result.add(value); + consumer.setStripeMetadata(value); + } + return result; + } + + private void ensureMetadataReader() throws IOException { + ensureOrcReader(); + if (metadataReader != null) return; + long startTime = counters.startTimeCounter(); + metadataReader = orcReader.metadata(); + counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime); + } + + @Override + public void returnData(OrcEncodedColumnBatch ecb) { + for (ColumnStreamData[] datas : ecb.getColumnData()) { + if (datas == null) continue; + for (ColumnStreamData data : datas) { + if (data == null || data.decRef() != 0) continue; + if (DebugUtils.isTraceLockingEnabled()) { + for (MemoryBuffer buf : data.getCacheBuffers()) { + LlapIoImpl.LOG.info("Unlocking " + buf + " at the end of processing"); + } + } + lowLevelCache.releaseBuffers(data.getCacheBuffers()); + CSD_POOL.offer(data); + } + } + // We can offer ECB even with some streams not discarded; reset() will clear the arrays. + ECB_POOL.offer(ecb); + } + + /** + * Determines which RGs need to be read, after stripes have been determined. + * SARG is applied, and readState is populated for each stripe accordingly. + * @param stripes All stripes in the file (field state is used to determine stripes to read). + */ + private boolean determineRgsToRead(boolean[] globalIncludes, int rowIndexStride, + ArrayList<OrcStripeMetadata> metadata) throws IOException { + SargApplier sargApp = null; + if (sarg != null && rowIndexStride != 0) { + List<OrcProto.Type> types = fileMetadata.getTypes(); + String[] colNamesForSarg = OrcInputFormat.getSargColumnNames( + columnNames, types, globalIncludes, fileMetadata.isOriginalFormat()); + sargApp = new SargApplier(sarg, colNamesForSarg, rowIndexStride, types, globalIncludes.length); + } + boolean hasAnyData = false; + // readState should have been initialized by this time with an empty array. + for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) { + int stripeIx = stripeIxMod + stripeIxFrom; + StripeInformation stripe = fileMetadata.getStripes().get(stripeIx); + int rgCount = getRgCount(stripe, rowIndexStride); + boolean[] rgsToRead = null; + if (sargApp != null) { + OrcStripeMetadata stripeMetadata = metadata.get(stripeIxMod); + rgsToRead = sargApp.pickRowGroups(stripe, stripeMetadata.getRowIndexes(), + stripeMetadata.getBloomFilterIndexes(), true); + } + boolean isNone = rgsToRead == SargApplier.READ_NO_RGS, + isAll = rgsToRead == SargApplier.READ_ALL_RGS; + hasAnyData = hasAnyData || !isNone; + if (DebugUtils.isTraceOrcEnabled()) { + if (isNone) { + LlapIoImpl.LOG.info("SARG eliminated all RGs for stripe " + stripeIx); + } else if (!isAll) { + LlapIoImpl.LOG.info("SARG picked RGs for stripe " + stripeIx + ": " + + DebugUtils.toString(rgsToRead)); + } else { + LlapIoImpl.LOG.info("Will read all " + rgCount + " RGs for stripe " + stripeIx); + } + } + assert isAll || isNone || rgsToRead.length == rgCount; + readState[stripeIxMod] = new boolean[columnIds.size()][]; + for (int j = 0; j < columnIds.size(); ++j) { + readState[stripeIxMod][j] = (isAll || isNone) ? rgsToRead : + Arrays.copyOf(rgsToRead, rgsToRead.length); + } + + adjustRgMetric(rgCount, rgsToRead, isNone, isAll); + } + return hasAnyData; + } + + private void adjustRgMetric(int rgCount, boolean[] rgsToRead, boolean isNone, + boolean isAll) { + int count = 0; + if (!isAll) { + for (boolean b : rgsToRead) { + if (b) + count++; + } + } else if (!isNone) { + count = rgCount; + } + counters.setCounter(QueryFragmentCounters.Counter.SELECTED_ROWGROUPS, count); + } + + + private int getRgCount(StripeInformation stripe, int rowIndexStride) { + return (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride); + } + + /** + * Determine which stripes to read for a split. Populates stripeIxFrom and readState. + */ + public void determineStripesToRead() { + // The unit of caching for ORC is (rg x column) (see OrcBatchKey). + List<StripeInformation> stripes = fileMetadata.getStripes(); + long offset = split.getStart(), maxOffset = offset + split.getLength(); + stripeIxFrom = -1; + int stripeIxTo = -1; + if (LlapIoImpl.LOGL.isDebugEnabled()) { + String tmp = "FileSplit {" + split.getStart() + ", " + split.getLength() + "}; stripes "; + for (StripeInformation stripe : stripes) { + tmp += "{" + stripe.getOffset() + ", " + stripe.getLength() + "}, "; + } + LlapIoImpl.LOG.debug(tmp); + } + + int stripeIx = 0; + for (StripeInformation stripe : stripes) { + long stripeStart = stripe.getOffset(); + if (offset > stripeStart) { + // We assume splits will never start in the middle of the stripe. + ++stripeIx; + continue; + } + if (stripeIxFrom == -1) { + if (DebugUtils.isTraceOrcEnabled()) { + LlapIoImpl.LOG.info("Including stripes from " + stripeIx + + " (" + stripeStart + " >= " + offset + ")"); + } + stripeIxFrom = stripeIx; + } + if (stripeStart >= maxOffset) { + stripeIxTo = stripeIx; + if (DebugUtils.isTraceOrcEnabled()) { + LlapIoImpl.LOG.info("Including stripes until " + stripeIxTo + " (" + stripeStart + + " >= " + maxOffset + "); " + (stripeIxTo - stripeIxFrom) + " stripes"); + } + break; + } + ++stripeIx; + } + if (stripeIxTo == -1) { + stripeIxTo = stripeIx; + if (DebugUtils.isTraceOrcEnabled()) { + LlapIoImpl.LOG.info("Including stripes until " + stripeIx + " (end of file); " + + (stripeIxTo - stripeIxFrom) + " stripes"); + } + } + readState = new boolean[stripeIxTo - stripeIxFrom][][]; + } + + // TODO: split by stripe? we do everything by stripe, and it might be faster + /** + * Takes the data from high-level cache for all stripes and returns to consumer. + * @return List of columns to read per stripe, if any columns were fully eliminated by cache. + */ + private List<Integer>[] produceDataFromCache(int rowIndexStride) throws IOException { + OrcCacheKey key = new OrcCacheKey(fileId, -1, -1, -1); + // For each stripe, keep a list of columns that are not fully in cache (null => all of them). + @SuppressWarnings("unchecked") + List<Integer>[] stripeColsNotInCache = new List[readState.length]; + for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) { + key.stripeIx = stripeIxFrom + stripeIxMod; + boolean[][] cols = readState[stripeIxMod]; + boolean[] isMissingAnyRgs = new boolean[cols.length]; + int totalRgCount = getRgCount(fileMetadata.getStripes().get(key.stripeIx), rowIndexStride); + for (int rgIx = 0; rgIx < totalRgCount; ++rgIx) { + OrcEncodedColumnBatch col = ECB_POOL.take(); + col.init(fileId, key.stripeIx, rgIx, cols.length); + boolean hasAnyCached = false; + try { + key.rgIx = rgIx; + for (int colIxMod = 0; colIxMod < cols.length; ++colIxMod) { + boolean[] readMask = cols[colIxMod]; + // Check if RG is eliminated by SARG + if ((readMask == SargApplier.READ_NO_RGS) || (readMask != SargApplier.READ_ALL_RGS + && (readMask.length <= rgIx || !readMask[rgIx]))) continue; + key.colIx = columnIds.get(colIxMod); + ColumnStreamData[] cached = cache.get(key); + if (cached == null) { + isMissingAnyRgs[colIxMod] = true; + continue; + } + assert cached.length == OrcEncodedColumnBatch.MAX_DATA_STREAMS; + col.setAllStreamsData(colIxMod, key.colIx, cached); + hasAnyCached = true; + if (readMask == SargApplier.READ_ALL_RGS) { + // We were going to read all RGs, but some were in cache, allocate the mask. + cols[colIxMod] = readMask = new boolean[totalRgCount]; + Arrays.fill(readMask, true); + } + readMask[rgIx] = false; // Got from cache, don't read from disk. + } + } catch (Throwable t) { + // TODO: Any cleanup needed to release data in col back to cache should be here. + throw (t instanceof IOException) ? (IOException)t : new IOException(t); + } + if (hasAnyCached) { + consumer.consumeData(col); + } + } + boolean makeStripeColList = false; // By default assume we'll fetch all original columns. + for (int colIxMod = 0; colIxMod < cols.length; ++colIxMod) { + if (isMissingAnyRgs[colIxMod]) { + if (makeStripeColList) { + stripeColsNotInCache[stripeIxMod].add(columnIds.get(colIxMod)); + } + } else if (!makeStripeColList) { + // Some columns were fully in cache. Make a per-stripe col list, add previous columns. + makeStripeColList = true; + stripeColsNotInCache[stripeIxMod] = new ArrayList<Integer>(cols.length - 1); + for (int i = 0; i < colIxMod; ++i) { + stripeColsNotInCache[stripeIxMod].add(columnIds.get(i)); + } + } + } + } + return stripeColsNotInCache; + } + + @Override + public void setDone() { + consumer.setDone(); + } + + @Override + public void consumeData(OrcEncodedColumnBatch data) { + // Store object in cache; create new key object - cannot be reused. + assert cache != null; + throw new UnsupportedOperationException("not implemented"); + /*for (int i = 0; i < data.getColumnData().length; ++i) { + OrcCacheKey key = new OrcCacheKey(data.getBatchKey(), data.getColumnIxs()[i]); + ColumnStreamData[] toCache = data.getColumnData()[i]; + ColumnStreamData[] cached = cache.cacheOrGet(key, toCache); + if (toCache != cached) { + for (ColumnStreamData sb : toCache) { + if (sb.decRef() != 0) continue; + lowLevelCache.releaseBuffers(sb.getCacheBuffers()); + } + data.getColumnData()[i] = cached; + } + } + consumer.consumeData(data);*/ + } + + @Override + public void setError(Throwable t) { + consumer.setError(t); + } + + private class DataWrapperForOrc implements DataReader, DataCache { + private DataReader orcDataReader; + + public DataWrapperForOrc() { + boolean useZeroCopy = (conf != null) && OrcConf.USE_ZEROCOPY.getBoolean(conf); + if (useZeroCopy && !lowLevelCache.getAllocator().isDirectAlloc()) { + throw new UnsupportedOperationException("Cannot use zero-copy reader with non-direct cache " + + "buffers; either disable zero-copy or enable direct cache allocation"); + } + this.orcDataReader = orcReader.createDefaultDataReader(useZeroCopy); + } + + @Override + public DiskRangeList getFileData(long fileId, DiskRangeList range, + long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) { + return lowLevelCache.getFileData(fileId, range, baseOffset, factory, counters, gotAllData); + } + + @Override + public long[] putFileData(long fileId, DiskRange[] ranges, + MemoryBuffer[] data, long baseOffset) { + return lowLevelCache.putFileData( + fileId, ranges, data, baseOffset, Priority.NORMAL, counters); + } + + @Override + public void releaseBuffer(MemoryBuffer buffer) { + lowLevelCache.releaseBuffer(buffer); + } + + @Override + public void reuseBuffer(MemoryBuffer buffer) { + boolean isReused = lowLevelCache.reuseBuffer(buffer); + assert isReused; + } + + @Override + public Allocator getAllocator() { + return lowLevelCache.getAllocator(); + } + + @Override + public void close() throws IOException { + orcDataReader.close(); + } + + @Override + public DiskRangeList readFileData(DiskRangeList range, long baseOffset, + boolean doForceDirect) throws IOException { + long startTime = counters.startTimeCounter(); + DiskRangeList result = orcDataReader.readFileData(range, baseOffset, doForceDirect); + counters.recordHdfsTime(startTime); + if (DebugUtils.isTraceOrcEnabled() && LOG.isInfoEnabled()) { + LOG.info("Disk ranges after disk read (file " + fileId + ", base offset " + baseOffset + + "): " + RecordReaderUtils.stringifyDiskRanges(result)); + } + return result; + } + + @Override + public boolean isTrackingDiskRanges() { + return orcDataReader.isTrackingDiskRanges(); + } + + @Override + public void releaseBuffer(ByteBuffer buffer) { + orcDataReader.releaseBuffer(buffer); + } + + @Override + public void open() throws IOException { + long startTime = counters.startTimeCounter(); + orcDataReader.open(); + counters.recordHdfsTime(startTime); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 5770bef,2500fb6..ffeaaa0 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@@ -56,9 -56,9 +56,10 @@@ import org.apache.hadoop.hive.ql.io.Aci import org.apache.hadoop.hive.ql.io.AcidUtils.Directory; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.InputFormatChecker; +import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader; + import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterVersion; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.Context; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; @@@ -264,10 -265,9 +266,9 @@@ public class OrcInputFormat implement boolean[] result = new boolean[numColumns]; result[0] = true; OrcProto.Type root = types.get(rootColumn); - for(int i=0; i < root.getSubtypesCount(); ++i) { + for (int i = 0; i < root.getSubtypesCount(); ++i) { if (included.contains(i)) { - includeColumnRecursive(types, result, root.getSubtypes(i), - rootColumn); + includeColumnRecursive(types, result, root.getSubtypes(i), rootColumn); } } return result; @@@ -866,33 -901,11 +904,11 @@@ // we can't eliminate stripes if there are deltas because the // deltas may change the rows making them match the predicate. - if (deltas.isEmpty()) { - Reader.Options options = new Reader.Options(); - options.include(includedCols); - setSearchArgument(options, types, context.conf, isOriginal); - // only do split pruning if HIVE-8732 has been fixed in the writer - if (options.getSearchArgument() != null && - writerVersion != OrcFile.WriterVersion.ORIGINAL) { - SearchArgument sarg = options.getSearchArgument(); - List<PredicateLeaf> sargLeaves = sarg.getLeaves(); - int[] filterColumns = RecordReaderImpl.mapSargColumns(sargLeaves, - options.getColumnNames(), getRootColumn(isOriginal)); - - if (stripeStats != null) { - // eliminate stripes that doesn't satisfy the predicate condition - includeStripe = new boolean[stripes.size()]; - for (int i = 0; i < stripes.size(); ++i) { - includeStripe[i] = (i >= stripeStats.size()) || - isStripeSatisfyPredicate(stripeStats.get(i), sarg, - filterColumns); - if (isDebugEnabled && !includeStripe[i]) { - LOG.debug("Eliminating ORC stripe-" + i + " of file '" + - fileWithId.getFileStatus().getPath() + "' as it did not satisfy " + - "predicate condition."); - } - } - } - } + if (deltas.isEmpty() && canCreateSargFromConf(context.conf)) { + SearchArgument sarg = ConvertAstToSearchArg.createFromConf(context.conf); + String[] sargColNames = extractNeededColNames(types, context.conf, includedCols, isOriginal); + includeStripe = pickStripes(sarg, sargColNames, writerVersion, isOriginal, - metadata.getStripeStatistics(), stripes.size(), file.getPath()); ++ stripeStats, stripes.size(), file.getPath()); } // if we didn't have predicate pushdown, read everything http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vector_count_distinct.q.out ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vector_decimal_aggregate.q.out ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vector_distinct_2.q.out ---------------------------------------------------------------------- diff --cc ql/src/test/results/clientpositive/spark/vector_distinct_2.q.out index c974d00,52c00f9..563213a --- a/ql/src/test/results/clientpositive/spark/vector_distinct_2.q.out +++ b/ql/src/test/results/clientpositive/spark/vector_distinct_2.q.out @@@ -143,20 -143,24 +143,32 @@@ STAGE PLANS Statistics: Num rows: 2000 Data size: 918712 Basic stats: COMPLETE Column stats: NONE Execution mode: vectorized Reducer 2 + Execution mode: vectorized Reduce Operator Tree: Group By Operator - keys: KEY._col0 (type: string), KEY._col1 (type: tinyint) + keys: KEY._col0 (type: tinyint), KEY._col1 (type: string) mode: mergepartial outputColumnNames: _col0, _col1 Statistics: Num rows: 1000 Data size: 459356 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false + Select Operator + expressions: _col1 (type: string), _col0 (type: tinyint) + outputColumnNames: _col0, _col1 Statistics: Num rows: 1000 Data size: 459356 Basic stats: COMPLETE Column stats: NONE ++<<<<<<< HEAD + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe ++======= + File Output Operator + compressed: false + Statistics: Num rows: 1000 Data size: 459356 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: vectorized ++>>>>>>> master Stage: Stage-0 Fetch Operator http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vector_groupby_3.q.out ---------------------------------------------------------------------- diff --cc ql/src/test/results/clientpositive/spark/vector_groupby_3.q.out index 9687ec1,2255f72..b2402db --- a/ql/src/test/results/clientpositive/spark/vector_groupby_3.q.out +++ b/ql/src/test/results/clientpositive/spark/vector_groupby_3.q.out @@@ -153,13 -152,18 +153,25 @@@ STAGE PLANS mode: mergepartial outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 1000 Data size: 459356 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false + Select Operator + expressions: _col1 (type: string), _col0 (type: tinyint), _col2 (type: bigint) + outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 1000 Data size: 459356 Basic stats: COMPLETE Column stats: NONE ++<<<<<<< HEAD + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe ++======= + File Output Operator + compressed: false + Statistics: Num rows: 1000 Data size: 459356 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: vectorized ++>>>>>>> master Stage: Stage-0 Fetch Operator http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out ---------------------------------------------------------------------- diff --cc ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out index a3a44df,bbc66fc..6308cee --- a/ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out +++ b/ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out @@@ -65,22 -65,17 +65,18 @@@ STAGE PLANS Filter Operator predicate: l_partkey is not null (type: boolean) Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: l_partkey (type: int) + Group By Operator + keys: l_partkey (type: int) + mode: hash outputColumnNames: _col0 Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE - Group By Operator - keys: _col0 (type: int) - mode: hash - outputColumnNames: _col0 + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: int) - sort order: + - Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE Reducer 4 + Execution mode: vectorized Local Work: Map Reduce Local Work Reduce Operator Tree: @@@ -270,22 -266,17 +266,18 @@@ STAGE PLANS Filter Operator predicate: l_partkey is not null (type: boolean) Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: l_partkey (type: int) + Group By Operator + keys: l_partkey (type: int) + mode: hash outputColumnNames: _col0 Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE - Group By Operator - keys: _col0 (type: int) - mode: hash - outputColumnNames: _col0 + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: int) - sort order: + - Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 50 Data size: 5999 Basic stats: COMPLETE Column stats: NONE Reducer 4 + Execution mode: vectorized Local Work: Map Reduce Local Work Reduce Operator Tree: http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vector_orderby_5.q.out ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vectorization_0.q.out ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vectorization_13.q.out ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vectorization_15.q.out ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vectorization_short_regress.q.out ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out ---------------------------------------------------------------------- diff --cc ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out index 7308fb2,316ed63..bf23ae4 --- a/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out +++ b/ql/src/test/results/clientpositive/spark/vectorized_nested_mapjoin.q.out @@@ -97,24 -97,19 +97,27 @@@ STAGE PLANS input vertices: 1 Map 4 Statistics: Num rows: 7433 Data size: 228226 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: _col1 (type: double) + Group By Operator + aggregations: sum(_col1) + mode: hash outputColumnNames: _col0 - Statistics: Num rows: 7433 Data size: 228226 Basic stats: COMPLETE Column stats: NONE - Group By Operator - aggregations: sum(_col0) - mode: hash - outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE ++<<<<<<< HEAD + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: double) + Execution mode: vectorized ++======= + value expressions: _col0 (type: double) ++>>>>>>> master Local Work: Map Reduce Local Work - Execution mode: vectorized Reducer 3 + Execution mode: vectorized Reduce Operator Tree: Group By Operator aggregations: sum(VALUE._col0) http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/spark/vectorized_timestamp_funcs.q.out ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out ---------------------------------------------------------------------- diff --cc ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out index 2e87e2c,63e6ade..8e4a501 --- a/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out +++ b/ql/src/test/results/clientpositive/tez/dynpart_sort_opt_vectorization.q.out @@@ -1591,37 -1591,37 +1591,37 @@@ STAGE PLANS Filter Operator predicate: (t is null or (t = 27)) (type: boolean) Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: si (type: smallint), i (type: int), b (type: bigint), f (type: float), t (type: tinyint) + Group By Operator + keys: t (type: tinyint), si (type: smallint), i (type: int), b (type: bigint), f (type: float) + mode: hash outputColumnNames: _col0, _col1, _col2, _col3, _col4 Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE - Group By Operator - keys: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float), _col4 (type: tinyint) - mode: hash - outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Reduce Output Operator + key expressions: _col0 (type: tinyint), _col1 (type: smallint), _col2 (type: int), _col3 (type: bigint), _col4 (type: float) + sort order: +++++ + Map-reduce partition columns: _col0 (type: tinyint), _col1 (type: smallint), _col2 (type: int), _col3 (type: bigint), _col4 (type: float) Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float), _col4 (type: tinyint) - sort order: +++++ - Map-reduce partition columns: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float), _col4 (type: tinyint) - Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE Execution mode: vectorized Reducer 2 + Execution mode: vectorized Reduce Operator Tree: Group By Operator - keys: KEY._col0 (type: smallint), KEY._col1 (type: int), KEY._col2 (type: bigint), KEY._col3 (type: float), KEY._col4 (type: tinyint) + keys: KEY._col0 (type: tinyint), KEY._col1 (type: smallint), KEY._col2 (type: int), KEY._col3 (type: bigint), KEY._col4 (type: float) mode: mergepartial outputColumnNames: _col0, _col1, _col2, _col3, _col4 Statistics: Num rows: 524 Data size: 155436 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false + Select Operator + expressions: _col1 (type: smallint), _col2 (type: int), _col3 (type: bigint), _col4 (type: float), _col0 (type: tinyint) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 Statistics: Num rows: 524 Data size: 155436 Basic stats: COMPLETE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.TextInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - name: default.over1k_part2_orc + File Output Operator + compressed: false + Statistics: Num rows: 524 Data size: 155436 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.over1k_part2_orc - Execution mode: vectorized Stage: Stage-2 Dependency Collection @@@ -1670,50 -1669,37 +1669,37 @@@ STAGE PLANS Filter Operator predicate: (t is null or (t = 27)) (type: boolean) Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: si (type: smallint), i (type: int), b (type: bigint), f (type: float), t (type: tinyint) + Group By Operator + keys: t (type: tinyint), si (type: smallint), i (type: int), b (type: bigint), f (type: float) + mode: hash outputColumnNames: _col0, _col1, _col2, _col3, _col4 Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE - Group By Operator - keys: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float), _col4 (type: tinyint) - mode: hash - outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Reduce Output Operator + key expressions: _col0 (type: tinyint), _col1 (type: smallint), _col2 (type: int), _col3 (type: bigint), _col4 (type: float) + sort order: +++++ + Map-reduce partition columns: _col0 (type: tinyint) Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float), _col4 (type: tinyint) - sort order: +++++ - Map-reduce partition columns: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float), _col4 (type: tinyint) - Statistics: Num rows: 1048 Data size: 310873 Basic stats: COMPLETE Column stats: NONE Execution mode: vectorized Reducer 2 + Execution mode: vectorized Reduce Operator Tree: Group By Operator - keys: KEY._col0 (type: smallint), KEY._col1 (type: int), KEY._col2 (type: bigint), KEY._col3 (type: float), KEY._col4 (type: tinyint) + keys: KEY._col0 (type: tinyint), KEY._col1 (type: smallint), KEY._col2 (type: int), KEY._col3 (type: bigint), KEY._col4 (type: float) mode: mergepartial outputColumnNames: _col0, _col1, _col2, _col3, _col4 Statistics: Num rows: 524 Data size: 155436 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col4 (type: tinyint) - sort order: + - Map-reduce partition columns: _col4 (type: tinyint) - Statistics: Num rows: 524 Data size: 155436 Basic stats: COMPLETE Column stats: NONE - value expressions: _col0 (type: smallint), _col1 (type: int), _col2 (type: bigint), _col3 (type: float), _col4 (type: tinyint) - Reducer 3 - Execution mode: vectorized - Reduce Operator Tree: - Select Operator - expressions: VALUE._col0 (type: smallint), VALUE._col1 (type: int), VALUE._col2 (type: bigint), VALUE._col3 (type: float), VALUE._col4 (type: tinyint) - outputColumnNames: _col0, _col1, _col2, _col3, _col4 - Statistics: Num rows: 524 Data size: 155436 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false + Select Operator + expressions: _col1 (type: smallint), _col2 (type: int), _col3 (type: bigint), _col4 (type: float), _col0 (type: tinyint) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 Statistics: Num rows: 524 Data size: 155436 Basic stats: COMPLETE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.TextInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - name: default.over1k_part2_orc + File Output Operator + compressed: false + Statistics: Num rows: 524 Data size: 155436 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.over1k_part2_orc - Execution mode: vectorized Stage: Stage-2 Dependency Collection http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/tez/mrr.q.out ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/tez/vector_binary_join_groupby.q.out ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/tez/vector_count_distinct.q.out ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/tez/vector_decimal_aggregate.q.out ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/tez/vector_decimal_udf.q.out ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f324305a/ql/src/test/results/clientpositive/tez/vector_distinct_2.q.out ---------------------------------------------------------------------- diff --cc ql/src/test/results/clientpositive/tez/vector_distinct_2.q.out index a1063ab,6c31294..44d207b --- a/ql/src/test/results/clientpositive/tez/vector_distinct_2.q.out +++ b/ql/src/test/results/clientpositive/tez/vector_distinct_2.q.out @@@ -143,20 -143,24 +143,24 @@@ STAGE PLANS Statistics: Num rows: 2000 Data size: 918712 Basic stats: COMPLETE Column stats: NONE Execution mode: vectorized Reducer 2 + Execution mode: vectorized Reduce Operator Tree: Group By Operator - keys: KEY._col0 (type: string), KEY._col1 (type: tinyint) + keys: KEY._col0 (type: tinyint), KEY._col1 (type: string) mode: mergepartial outputColumnNames: _col0, _col1 Statistics: Num rows: 1000 Data size: 459356 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false + Select Operator + expressions: _col1 (type: string), _col0 (type: tinyint) + outputColumnNames: _col0, _col1 Statistics: Num rows: 1000 Data size: 459356 Basic stats: COMPLETE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.TextInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + File Output Operator + compressed: false + Statistics: Num rows: 1000 Data size: 459356 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - Execution mode: vectorized Stage: Stage-0 Fetch Operator
