HIVE-12878: Support Vectorization for TEXTFILE and other formats (Matt McCline, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d5285d8e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d5285d8e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d5285d8e Branch: refs/heads/llap Commit: d5285d8ebaf5bef2a13b2c2338be2fe683804b02 Parents: 2f0339b Author: Matt McCline <[email protected]> Authored: Mon May 2 16:58:53 2016 -0700 Committer: Matt McCline <[email protected]> Committed: Mon May 2 16:58:53 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 9 + data/files/struct1_a.txt | 4 + data/files/struct1_b.txt | 1 + data/files/struct1_c.txt | 1 + data/files/struct2_a.txt | 2 + data/files/struct2_b.txt | 2 + data/files/struct2_c.txt | 1 + data/files/struct2_d.txt | 1 + data/files/struct3_a.txt | 2 + data/files/struct3_b.txt | 1 + data/files/struct3_c.txt | 1 + data/files/struct4_a.txt | 2 + data/files/struct4_b.txt | 1 + data/files/struct4_c.txt | 1 + .../test/resources/testconfiguration.properties | 28 +- .../hive/llap/io/api/impl/LlapInputFormat.java | 2 +- .../hive/ql/exec/AbstractMapOperator.java | 178 + .../apache/hadoop/hive/ql/exec/MapOperator.java | 86 +- .../apache/hadoop/hive/ql/exec/Utilities.java | 33 +- .../hadoop/hive/ql/exec/mr/ExecMapper.java | 3 +- .../ql/exec/spark/SparkMapRecordHandler.java | 3 +- .../hadoop/hive/ql/exec/tez/DagUtils.java | 3 +- .../hive/ql/exec/tez/MapRecordProcessor.java | 15 +- .../hive/ql/exec/tez/MapRecordSource.java | 6 +- .../hive/ql/exec/tez/ReduceRecordSource.java | 4 +- .../vector/VectorAppMasterEventOperator.java | 16 +- .../hive/ql/exec/vector/VectorAssignRow.java | 1111 ++-- .../ql/exec/vector/VectorAssignRowDynBatch.java | 41 - .../exec/vector/VectorAssignRowSameBatch.java | 36 - .../ql/exec/vector/VectorDeserializeRow.java | 1114 ++-- .../hive/ql/exec/vector/VectorExtractRow.java | 971 +--- .../exec/vector/VectorExtractRowDynBatch.java | 40 - .../exec/vector/VectorExtractRowSameBatch.java | 36 - .../ql/exec/vector/VectorFileSinkOperator.java | 16 +- .../ql/exec/vector/VectorGroupByOperator.java | 13 +- .../exec/vector/VectorMapJoinBaseOperator.java | 11 +- .../ql/exec/vector/VectorMapJoinOperator.java | 4 +- .../VectorMapJoinOuterFilteredOperator.java | 17 +- .../hive/ql/exec/vector/VectorMapOperator.java | 848 ++- .../exec/vector/VectorReduceSinkOperator.java | 16 +- .../exec/vector/VectorSMBMapJoinOperator.java | 11 +- .../VectorSparkHashTableSinkOperator.java | 16 +- ...VectorSparkPartitionPruningSinkOperator.java | 13 +- .../ql/exec/vector/VectorizationContext.java | 12 +- .../ql/exec/vector/VectorizedBatchUtil.java | 49 + .../VectorMapJoinGenerateResultOperator.java | 8 +- .../fast/VectorMapJoinFastLongHashUtil.java | 10 +- .../fast/VectorMapJoinFastStringCommon.java | 10 +- .../hadoop/hive/ql/io/HiveInputFormat.java | 6 +- .../hadoop/hive/ql/io/NullRowsInputFormat.java | 2 +- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 4 +- .../ql/io/parquet/MapredParquetInputFormat.java | 2 +- .../hive/ql/optimizer/physical/Vectorizer.java | 334 +- .../apache/hadoop/hive/ql/plan/BaseWork.java | 23 + .../org/apache/hadoop/hive/ql/plan/MapWork.java | 11 + .../hive/ql/plan/VectorPartitionConversion.java | 172 +- .../hive/ql/plan/VectorPartitionDesc.java | 164 +- .../ql/exec/vector/TestVectorRowObject.java | 14 +- .../hive/ql/exec/vector/TestVectorSerDeRow.java | 169 +- .../hive/ql/io/orc/TestInputOutputFormat.java | 10 +- .../avro_schema_evolution_native.q | 18 + .../queries/clientpositive/bucket_groupby.q | 33 +- .../queries/clientpositive/groupby_sort_10.q | 2 + .../schema_evol_orc_acidvec_mapwork_part.q | 3 + .../schema_evol_orc_acidvec_mapwork_table.q | 3 + .../schema_evol_orc_nonvec_mapwork_table.q | 2 - .../schema_evol_orc_vec_mapwork_part.q | 3 + .../schema_evol_orc_vec_mapwork_table.q | 7 +- .../schema_evol_text_fetchwork_table.q | 56 - .../schema_evol_text_mapwork_table.q | 56 - .../schema_evol_text_nonvec_fetchwork_part.q | 98 - .../schema_evol_text_nonvec_fetchwork_table.q | 67 - .../schema_evol_text_nonvec_mapwork_part.q | 828 ++- ..._evol_text_nonvec_mapwork_part_all_complex.q | 159 + ...vol_text_nonvec_mapwork_part_all_primitive.q | 509 ++ .../schema_evol_text_nonvec_mapwork_table.q | 822 ++- .../schema_evol_text_vec_mapwork_part.q | 827 +++ ...ema_evol_text_vec_mapwork_part_all_complex.q | 164 + ...a_evol_text_vec_mapwork_part_all_primitive.q | 514 ++ .../schema_evol_text_vec_mapwork_table.q | 826 +++ .../schema_evol_text_vecrow_mapwork_part.q | 827 +++ ..._evol_text_vecrow_mapwork_part_all_complex.q | 165 + ...vol_text_vecrow_mapwork_part_all_primitive.q | 514 ++ .../schema_evol_text_vecrow_mapwork_table.q | 826 +++ .../clientpositive/tez_schema_evolution.q | 1 + .../avro_schema_evolution_native.q.out | 206 + .../results/clientpositive/bucket_groupby.q.out | 308 +- .../clientpositive/groupby_sort_10.q.out | 8 +- .../schema_evol_text_fetchwork_table.q.out | 298 -- .../schema_evol_text_mapwork_table.q.out | 298 -- ...schema_evol_text_nonvec_fetchwork_part.q.out | 642 --- ...chema_evol_text_nonvec_fetchwork_table.q.out | 297 -- .../schema_evol_text_nonvec_mapwork_part.q.out | 4909 ++++++++++++++++-- ...l_text_nonvec_mapwork_part_all_complex.q.out | 726 +++ ...text_nonvec_mapwork_part_all_primitive.q.out | 3038 +++++++++++ .../schema_evol_text_nonvec_mapwork_table.q.out | 4376 +++++++++++++++- .../schema_evol_text_vec_mapwork_part.q.out | 4479 ++++++++++++++++ ...evol_text_vec_mapwork_part_all_complex.q.out | 730 +++ ...ol_text_vec_mapwork_part_all_primitive.q.out | 3058 +++++++++++ .../schema_evol_text_vec_mapwork_table.q.out | 4221 +++++++++++++++ .../schema_evol_text_vecrow_mapwork_part.q.out | 4479 ++++++++++++++++ ...l_text_vecrow_mapwork_part_all_complex.q.out | 732 +++ ...text_vecrow_mapwork_part_all_primitive.q.out | 3058 +++++++++++ .../schema_evol_text_vecrow_mapwork_table.q.out | 4221 +++++++++++++++ .../tez/schema_evol_text_fetchwork_table.q.out | 298 -- .../tez/schema_evol_text_mapwork_table.q.out | 298 -- ...schema_evol_text_nonvec_fetchwork_part.q.out | 642 --- ...chema_evol_text_nonvec_fetchwork_table.q.out | 297 -- .../schema_evol_text_nonvec_mapwork_part.q.out | 4453 ++++++++++++++-- ...l_text_nonvec_mapwork_part_all_complex.q.out | 669 +++ ...text_nonvec_mapwork_part_all_primitive.q.out | 2734 ++++++++++ .../schema_evol_text_nonvec_mapwork_table.q.out | 3920 +++++++++++++- .../tez/schema_evol_text_vec_mapwork_part.q.out | 3999 ++++++++++++++ ...evol_text_vec_mapwork_part_all_complex.q.out | 673 +++ ...ol_text_vec_mapwork_part_all_primitive.q.out | 2738 ++++++++++ .../schema_evol_text_vec_mapwork_table.q.out | 3741 +++++++++++++ .../schema_evol_text_vecrow_mapwork_part.q.out | 3999 ++++++++++++++ ...l_text_vecrow_mapwork_part_all_complex.q.out | 675 +++ ...text_vecrow_mapwork_part_all_primitive.q.out | 2738 ++++++++++ .../schema_evol_text_vecrow_mapwork_table.q.out | 3741 +++++++++++++ .../vector_orc_string_reader_empty_dict.q.out | 62 + .../tez/vector_partition_diff_num_cols.q.out | 1 + .../tez/vector_tablesample_rows.q.out | 307 ++ .../vector_partition_diff_num_cols.q.out | 1 + .../vector_tablesample_rows.q.out | 2 - .../fast/BinarySortableDeserializeRead.java | 806 +-- .../hive/serde2/fast/DeserializeRead.java | 379 +- .../lazy/fast/LazySimpleDeserializeRead.java | 704 +-- .../fast/LazyBinaryDeserializeRead.java | 944 +--- .../apache/hadoop/hive/serde2/VerifyFast.java | 75 +- 130 files changed, 82172 insertions(+), 9858 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 2814353..caadf2a 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2481,6 +2481,15 @@ public class HiveConf extends Configuration { "This flag should be set to true to enable the new vectorization\n" + "of queries using ReduceSink.\ni" + "The default value is true."), + HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT("hive.vectorized.use.vectorized.input.format", true, + "This flag should be set to true to enable vectorizing with vectorized input file format capable SerDe.\n" + + "The default value is true."), + HIVE_VECTORIZATION_USE_VECTOR_DESERIALIZE("hive.vectorized.use.vector.serde.deserialize", false, + "This flag should be set to true to enable vectorizing rows using vector deserialize.\n" + + "The default value is false."), + HIVE_VECTORIZATION_USE_ROW_DESERIALIZE("hive.vectorized.use.row.serde.deserialize", false, + "This flag should be set to true to enable vectorizing using row deserialize.\n" + + "The default value is false."), HIVE_TYPE_CHECK_ON_INSERT("hive.typecheck.on.insert", true, "This property has been extended to control " + "whether to check, convert, and normalize partition value to conform to its column type in " + "partition operations including but not limited to insert, such as alter, describe etc."), http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct1_a.txt ---------------------------------------------------------------------- diff --git a/data/files/struct1_a.txt b/data/files/struct1_a.txt new file mode 100644 index 0000000..b36846e --- /dev/null +++ b/data/files/struct1_a.txt @@ -0,0 +1,4 @@ +1|true,200,72909,3244222,-99999999999,-29.0764,470614135,470614135,dynamic reptile ,dynamic reptile ,0004-09-22 18:26:29.519542222,2007-02-09,binary|original +2|0,100,483777,14,-23866739993,-3651.672121,46114.284799488,46114.284799488, baffling , baffling ,2007-02-09 05:17:29.368756876,0004-09-22,binary|original +3|false,72,3244222,-93222,30.774,-66475.561431,-66475.561431,0.561431,1,1,6229-06-28 02:54:28.970117179,5966-07-09,binary|original +4|1,-90,754072151,3289094,46114.284799488,9250340.75,9250340.75,9250340.75,junkyard,junkyard,2002-05-10 05:29:48.990818073,1815-05-06,binary|original http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct1_b.txt ---------------------------------------------------------------------- diff --git a/data/files/struct1_b.txt b/data/files/struct1_b.txt new file mode 100644 index 0000000..1887c68 --- /dev/null +++ b/data/files/struct1_b.txt @@ -0,0 +1 @@ +5|true,400,44388,-100,953967041.,62.079153,718.78,1,verdict,verdict,timestamp,date,binary|new http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct1_c.txt ---------------------------------------------------------------------- diff --git a/data/files/struct1_c.txt b/data/files/struct1_c.txt new file mode 100644 index 0000000..5d482c8 --- /dev/null +++ b/data/files/struct1_c.txt @@ -0,0 +1 @@ +6|false,-67,833,63993,1255178165.77663,905070.974,-4314.7918,-1240033819,trial,trial,2016-03-0703:02:22.0,2016-03-07,binary|new http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct2_a.txt ---------------------------------------------------------------------- diff --git a/data/files/struct2_a.txt b/data/files/struct2_a.txt new file mode 100644 index 0000000..7fdfef1 --- /dev/null +++ b/data/files/struct2_a.txt @@ -0,0 +1,2 @@ +3|new|true,200,72909,3244222,-99999999999,-29.0764,470614135,470614135,dynamic reptile ,dynamic reptile ,0004-09-22 18:26:29.519542222,2007-02-09,binary +4|new|0,100,483777,14,-23866739993,-3651.672121,46114.284799488,46114.284799488, baffling , baffling ,2007-02-09 05:17:29.368756876,0004-09-22,binary http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct2_b.txt ---------------------------------------------------------------------- diff --git a/data/files/struct2_b.txt b/data/files/struct2_b.txt new file mode 100644 index 0000000..a814af3 --- /dev/null +++ b/data/files/struct2_b.txt @@ -0,0 +1,2 @@ +5|new|false,72,3244222,-93222,30.774,-66475.561431,-66475.561431,0.561431,1,1,6229-06-28 02:54:28.970117179,5966-07-09,binary +6|new|1,-90,754072151,3289094,46114.284799488,9250340.75,9250340.75,9250340.75,junkyard,junkyard,2002-05-10 05:29:48.990818073,1815-05-06,binary http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct2_c.txt ---------------------------------------------------------------------- diff --git a/data/files/struct2_c.txt b/data/files/struct2_c.txt new file mode 100644 index 0000000..2c9c1bb --- /dev/null +++ b/data/files/struct2_c.txt @@ -0,0 +1 @@ +7|new|true,400,44388,-100,953967041.,62.079153,718.78,1,verdict,verdict,timestamp,date,binary \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct2_d.txt ---------------------------------------------------------------------- diff --git a/data/files/struct2_d.txt b/data/files/struct2_d.txt new file mode 100644 index 0000000..3c7801e --- /dev/null +++ b/data/files/struct2_d.txt @@ -0,0 +1 @@ +8|new|false,-67,833,63993,1255178165.77663,905070.974,-4314.7918,-1240033819,trial,trial,2016-03-0703:02:22.0,2016-03-07,binary \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct3_a.txt ---------------------------------------------------------------------- diff --git a/data/files/struct3_a.txt b/data/files/struct3_a.txt new file mode 100644 index 0000000..19dbd7f --- /dev/null +++ b/data/files/struct3_a.txt @@ -0,0 +1,2 @@ +1|true,200,72909,3244222,-99999999999|original +2|0,100,483777,14,-23866739993|original \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct3_b.txt ---------------------------------------------------------------------- diff --git a/data/files/struct3_b.txt b/data/files/struct3_b.txt new file mode 100644 index 0000000..030e0c0 --- /dev/null +++ b/data/files/struct3_b.txt @@ -0,0 +1 @@ +3|true,400,44388,-100,953967041.,62.079153,718.78,1,verdict,verdict,timestamp,date,binary|new \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct3_c.txt ---------------------------------------------------------------------- diff --git a/data/files/struct3_c.txt b/data/files/struct3_c.txt new file mode 100644 index 0000000..236694b --- /dev/null +++ b/data/files/struct3_c.txt @@ -0,0 +1 @@ +4|false,-67,833,63993,1255178165.77663,905070.974,-4314.7918,-1240033819,trial,trial,2016-03-0703:02:22.0,2016-03-07,binary|new \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct4_a.txt ---------------------------------------------------------------------- diff --git a/data/files/struct4_a.txt b/data/files/struct4_a.txt new file mode 100644 index 0000000..ecf832f --- /dev/null +++ b/data/files/struct4_a.txt @@ -0,0 +1,2 @@ +1|original|true,200,72909,3244222,-99999999999 +2|original|0,100,483777,14,-23866739993 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct4_b.txt ---------------------------------------------------------------------- diff --git a/data/files/struct4_b.txt b/data/files/struct4_b.txt new file mode 100644 index 0000000..701253c --- /dev/null +++ b/data/files/struct4_b.txt @@ -0,0 +1 @@ +3|new|true,400,44388,-100,953967041.,62.079153,718.78,1,verdict,verdict,timestamp,date,binary \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/data/files/struct4_c.txt ---------------------------------------------------------------------- diff --git a/data/files/struct4_c.txt b/data/files/struct4_c.txt new file mode 100644 index 0000000..c56e002 --- /dev/null +++ b/data/files/struct4_c.txt @@ -0,0 +1 @@ +4|new|false,-67,833,63993,1255178165.77663,905070.974,-4314.7918,-1240033819,trial,trial,2016-03-0703:02:22.0,2016-03-07,binary \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 0ef3161..346a38d 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -186,22 +186,28 @@ minitez.query.files.shared=acid_globallimit.q,\ ptf_streaming.q,\ sample1.q,\ schema_evol_stats.q,\ - schema_evol_text_nonvec_mapwork_table.q,\ - schema_evol_text_nonvec_fetchwork_table.q,\ - schema_evol_orc_nonvec_fetchwork_part.q,\ - schema_evol_orc_nonvec_mapwork_part.q,\ - schema_evol_text_nonvec_fetchwork_part.q,\ - schema_evol_text_nonvec_mapwork_part.q,\ schema_evol_orc_acid_mapwork_part.q,\ schema_evol_orc_acid_mapwork_table.q,\ - schema_evol_orc_acidvec_mapwork_table.q,\ schema_evol_orc_acidvec_mapwork_part.q,\ + schema_evol_orc_acidvec_mapwork_table.q,\ + schema_evol_orc_nonvec_fetchwork_part.q,\ + schema_evol_orc_nonvec_fetchwork_table.q,\ + schema_evol_orc_nonvec_mapwork_part.q,\ + schema_evol_orc_nonvec_mapwork_table.q,\ schema_evol_orc_vec_mapwork_part.q,\ - schema_evol_text_fetchwork_table.q,\ - schema_evol_text_mapwork_table.q,\ schema_evol_orc_vec_mapwork_table.q,\ - schema_evol_orc_nonvec_mapwork_table.q,\ - schema_evol_orc_nonvec_fetchwork_table.q,\ + schema_evol_text_nonvec_mapwork_part.q,\ + schema_evol_text_nonvec_mapwork_part_all_complex.q,\ + schema_evol_text_nonvec_mapwork_part_all_primitive.q,\ + schema_evol_text_nonvec_mapwork_table.q,\ + schema_evol_text_vec_mapwork_part.q,\ + schema_evol_text_vec_mapwork_part_all_complex.q,\ + schema_evol_text_vec_mapwork_part_all_primitive.q,\ + schema_evol_text_vec_mapwork_table.q,\ + schema_evol_text_vecrow_mapwork_part.q,\ + schema_evol_text_vecrow_mapwork_part_all_complex.q,\ + schema_evol_text_vecrow_mapwork_part_all_primitive.q,\ + schema_evol_text_vecrow_mapwork_table.q,\ selectDistinctStar.q,\ script_env_var1.q,\ script_env_var2.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index 9fb79a5..298f788 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -102,7 +102,7 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB sourceInputFormat.getRecordReader(split, job, reporter); return rr; } - boolean isVectorMode = Utilities.isVectorMode(job); + boolean isVectorMode = Utilities.getUseVectorizedInputFileFormat(job); if (!isVectorMode) { LlapIoImpl.LOG.error("No LLAP IO in non-vectorized mode"); throw new UnsupportedOperationException("No LLAP IO in non-vectorized mode"); http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java new file mode 100644 index 0000000..5c3012b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; + + +/** + * Abstract Map operator. Common code of MapOperator and VectorMapOperator. + **/ +@SuppressWarnings("deprecation") +public abstract class AbstractMapOperator extends Operator<MapWork> + implements Serializable, Cloneable { + + private static final long serialVersionUID = 1L; + + /** + * Initialization call sequence: + * + * (Operator) Operator.setConf(MapWork conf); + * (Operator) Operator.initialize( + * Configuration hconf, ObjectInspector[] inputOIs); + * + * ([Vector]MapOperator) @Override setChildren(Configuration hconf) + * + * (Operator) Operator.passExecContext(ExecMapperContext execContext) + * (Operator) Operator.initializeLocalWork(Configuration hconf) + * + * (AbstractMapOperator) initializeMapOperator(Configuration hconf) + * + * [ (AbstractMapOperator) initializeContexts() ] // exec.tez.MapRecordProcessor only. + * + * (Operator) Operator.setReporter(Reporter rep) + * + */ + + /** + * Counter. + * + */ + public static enum Counter { + DESERIALIZE_ERRORS, + RECORDS_IN + } + + protected final transient LongWritable deserialize_error_count = new LongWritable(); + protected final transient LongWritable recordCounter = new LongWritable(); + protected transient long numRows = 0; + + private final Map<Integer, DummyStoreOperator> connectedOperators + = new TreeMap<Integer, DummyStoreOperator>(); + + private transient final Map<String, Path> normalizedPaths = new HashMap<String, Path>(); + + private Path normalizePath(String onefile, boolean schemaless) { + //creating Path is expensive, so cache the corresponding + //Path object in normalizedPaths + Path path = normalizedPaths.get(onefile); + if (path == null) { + path = new Path(onefile); + if (schemaless && path.toUri().getScheme() != null) { + path = new Path(path.toUri().getPath()); + } + normalizedPaths.put(onefile, path); + } + return path; + } + + protected String getNominalPath(Path fpath) { + String nominal = null; + boolean schemaless = fpath.toUri().getScheme() == null; + for (String onefile : conf.getPathToAliases().keySet()) { + Path onepath = normalizePath(onefile, schemaless); + Path curfpath = fpath; + if(!schemaless && onepath.toUri().getScheme() == null) { + curfpath = new Path(fpath.toUri().getPath()); + } + // check for the operators who will process rows coming to this Map Operator + if (onepath.toUri().relativize(curfpath.toUri()).equals(curfpath.toUri())) { + // not from this + continue; + } + if (nominal != null) { + throw new IllegalStateException("Ambiguous input path " + fpath); + } + nominal = onefile; + } + if (nominal == null) { + throw new IllegalStateException("Invalid input path " + fpath); + } + return nominal; + } + + public abstract void initEmptyInputChildren(List<Operator<?>> children, Configuration hconf) + throws SerDeException, Exception; + + + /** Kryo ctor. */ + protected AbstractMapOperator() { + super(); + } + + public AbstractMapOperator(CompilationOpContext ctx) { + super(ctx); + } + + public abstract void setChildren(Configuration hconf) throws Exception; + + + public void initializeMapOperator(Configuration hconf) throws HiveException { + // set that parent initialization is done and call initialize on children + state = State.INIT; + + statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count); + + numRows = 0; + + String context = hconf.get(Operator.CONTEXT_NAME_KEY, ""); + if (context != null && !context.isEmpty()) { + context = "_" + context.replace(" ","_"); + } + statsMap.put(Counter.RECORDS_IN + context, recordCounter); + } + + public abstract void initializeContexts() throws HiveException; + + public abstract Deserializer getCurrentDeserializer(); + + public abstract void process(Writable value) throws HiveException; + + @Override + public void closeOp(boolean abort) throws HiveException { + recordCounter.set(numRows); + super.closeOp(abort); + } + + public void clearConnectedOperators() { + connectedOperators.clear(); + } + + public void setConnectedOperators(int tag, DummyStoreOperator dummyOp) { + connectedOperators.put(tag, dummyOp); + } + + public Map<Integer, DummyStoreOperator> getConnectedOperators() { + return connectedOperators; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index b1f9958..afe5ee2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -72,25 +72,11 @@ import com.google.common.annotations.VisibleForTesting; * Writable data structure from a Table (instead of a Hive Object). **/ @SuppressWarnings("deprecation") -public class MapOperator extends Operator<MapWork> implements Serializable, Cloneable { +public class MapOperator extends AbstractMapOperator { private static final long serialVersionUID = 1L; - /** - * Counter. - * - */ - public static enum Counter { - DESERIALIZE_ERRORS, - RECORDS_IN - } - - private final transient LongWritable deserialize_error_count = new LongWritable(); - private final transient LongWritable recordCounter = new LongWritable(); - protected transient long numRows = 0; protected transient long cntr = 1; - private final Map<Integer, DummyStoreOperator> connectedOperators - = new TreeMap<Integer, DummyStoreOperator>(); protected transient long logEveryNRows = 0; // input path --> {operator --> context} @@ -102,7 +88,6 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon // context for current input file protected transient MapOpCtx[] currentCtxs; - private transient final Map<String, Path> normalizedPaths = new HashMap<String, Path>(); protected static class MapOpCtx { @@ -433,31 +418,6 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon } } - private String getNominalPath(Path fpath) { - String nominal = null; - boolean schemaless = fpath.toUri().getScheme() == null; - for (String onefile : conf.getPathToAliases().keySet()) { - Path onepath = normalizePath(onefile, schemaless); - Path curfpath = fpath; - if(!schemaless && onepath.toUri().getScheme() == null) { - curfpath = new Path(fpath.toUri().getPath()); - } - // check for the operators who will process rows coming to this Map Operator - if (onepath.toUri().relativize(curfpath.toUri()).equals(curfpath.toUri())) { - // not from this - continue; - } - if (nominal != null) { - throw new IllegalStateException("Ambiguous input path " + fpath); - } - nominal = onefile; - } - if (nominal == null) { - throw new IllegalStateException("Invalid input path " + fpath); - } - return nominal; - } - /** Kryo ctor. */ protected MapOperator() { super(); @@ -473,32 +433,17 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon } public void initializeMapOperator(Configuration hconf) throws HiveException { - // set that parent initialization is done and call initialize on children - state = State.INIT; - statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count); + super.initializeMapOperator(hconf); - numRows = 0; cntr = 1; logEveryNRows = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_LOG_N_RECORDS); - String context = hconf.get(Operator.CONTEXT_NAME_KEY, ""); - if (context != null && !context.isEmpty()) { - context = "_" + context.replace(" ","_"); - } - statsMap.put(Counter.RECORDS_IN + context, recordCounter); - for (Entry<Operator<?>, StructObjectInspector> entry : childrenOpToOI.entrySet()) { Operator<?> child = entry.getKey(); child.initialize(hconf, new ObjectInspector[] {entry.getValue()}); } } - @Override - public void closeOp(boolean abort) throws HiveException { - recordCounter.set(numRows); - super.closeOp(abort); - } - // Find context for current input file @Override public void cleanUpInputFileChangedOp() throws HiveException { @@ -528,20 +473,6 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon currentCtxs = contexts.values().toArray(new MapOpCtx[contexts.size()]); } - private Path normalizePath(String onefile, boolean schemaless) { - //creating Path is expensive, so cache the corresponding - //Path object in normalizedPaths - Path path = normalizedPaths.get(onefile); - if (path == null) { - path = new Path(onefile); - if (schemaless && path.toUri().getScheme() != null) { - path = new Path(path.toUri().getPath()); - } - normalizedPaths.put(onefile, path); - } - return path; - } - public void process(Writable value) throws HiveException { // A mapper can span multiple files/partitions. // The serializers need to be reset if the input file changed @@ -698,17 +629,4 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon return currentCtxs[0].deserializer; } - - public void clearConnectedOperators() { - connectedOperators.clear(); - } - - public void setConnectedOperators(int tag, DummyStoreOperator dummyOp) { - connectedOperators.put(tag, dummyOp); - } - - public Map<Integer, DummyStoreOperator> getConnectedOperators() { - return connectedOperators; - } - } http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index ab0635e..449bef8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -216,6 +216,7 @@ public final class Utilities { public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class"; public static final String HIVE_ADDED_JARS = "hive.added.jars"; public static final String VECTOR_MODE = "VECTOR_MODE"; + public static final String USE_VECTORIZED_INPUT_FILE_FORMAT = "USE_VECTORIZED_INPUT_FILE_FORMAT"; public static String MAPNAME = "Map "; public static String REDUCENAME = "Reducer "; @@ -3254,24 +3255,39 @@ public final class Utilities { /** * Returns true if a plan is both configured for vectorized execution - * and vectorization is allowed. The plan may be configured for vectorization + * and the node is vectorized and the Input File Format is marked VectorizedInputFileFormat. + * + * The plan may be configured for vectorization * but vectorization disallowed eg. for FetchOperator execution. */ - public static boolean isVectorMode(Configuration conf) { + public static boolean getUseVectorizedInputFileFormat(Configuration conf) { if (conf.get(VECTOR_MODE) != null) { // this code path is necessary, because with HS2 and client // side split generation we end up not finding the map work. // This is because of thread local madness (tez split // generation is multi-threaded - HS2 plan cache uses thread // locals). - return conf.getBoolean(VECTOR_MODE, false); + return + conf.getBoolean(VECTOR_MODE, false) && + conf.getBoolean(USE_VECTORIZED_INPUT_FILE_FORMAT, false); } else { - return HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) - && Utilities.getPlanPath(conf) != null - && Utilities.getMapWork(conf).getVectorMode(); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) && + Utilities.getPlanPath(conf) != null) { + MapWork mapWork = Utilities.getMapWork(conf); + return (mapWork.getVectorMode() && mapWork.getUseVectorizedInputFileFormat()); + } else { + return false; + } } } + + public static boolean getUseVectorizedInputFileFormat(Configuration conf, MapWork mapWork) { + return HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) && + mapWork.getVectorMode() && + mapWork.getUseVectorizedInputFileFormat(); + } + /** * @param conf * @return the configured VectorizedRowBatchCtx for a MapWork task. @@ -3288,11 +3304,6 @@ public final class Utilities { return result; } - public static boolean isVectorMode(Configuration conf, MapWork mapWork) { - return HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) - && mapWork.getVectorMode(); - } - public static void clearWorkMapForConf(Configuration conf) { // Remove cached query plans for the current query only Path mapPath = getPlanPath(conf, MAP_PLAN_NAME); http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java index c34dd1f..f90a788 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.AbstractMapOperator; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.Operator; @@ -59,7 +60,7 @@ import org.apache.hadoop.util.StringUtils; */ public class ExecMapper extends MapReduceBase implements Mapper { - private MapOperator mo; + private AbstractMapOperator mo; private OutputCollector oc; private JobConf jc; private boolean abort = false; http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java index d8fe35f..48dfedc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java @@ -25,6 +25,7 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.AbstractMapOperator; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.Operator; @@ -55,7 +56,7 @@ import org.apache.hadoop.mapred.Reporter; */ public class SparkMapRecordHandler extends SparkRecordHandler { private static final Logger LOG = LoggerFactory.getLogger(SparkMapRecordHandler.class); - private MapOperator mo; + private AbstractMapOperator mo; private MapredLocalWork localWork = null; private boolean isLogInfoEnabled = false; private ExecMapperContext execContext; http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 79da860..a1e4e6c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -625,9 +625,10 @@ public class DagUtils { // generation we end up not finding the map work. This is // because of thread local madness (tez split generation is // multi-threaded - HS2 plan cache uses thread locals). Setting - // VECTOR_MODE causes the split gen code to use the conf instead + // VECTOR_MODE/USE_VECTORIZED_INPUT_FILE_FORMAT causes the split gen code to use the conf instead // of the map work. conf.setBoolean(Utilities.VECTOR_MODE, mapWork.getVectorMode()); + conf.setBoolean(Utilities.USE_VECTORIZED_INPUT_FILE_FORMAT, mapWork.getUseVectorizedInputFileFormat()); dataSource = MRInputHelpers.configureMRInputWithLegacySplitGeneration(conf, new Path(tezDir, "split_" + mapWork.getName().replaceAll(" ", "_")), true); http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 0584ad8..9a9f43a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -33,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.AbstractMapOperator; import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; @@ -75,8 +76,8 @@ public class MapRecordProcessor extends RecordProcessor { public static final Logger l4j = LoggerFactory.getLogger(MapRecordProcessor.class); protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; - private MapOperator mapOp; - private final List<MapOperator> mergeMapOpList = new ArrayList<MapOperator>(); + private AbstractMapOperator mapOp; + private final List<AbstractMapOperator> mergeMapOpList = new ArrayList<AbstractMapOperator>(); private MapRecordSource[] sources; private final Map<String, MultiMRInput> multiMRInputMap = new HashMap<String, MultiMRInput>(); private int position; @@ -183,7 +184,7 @@ public class MapRecordProcessor extends RecordProcessor { boolean fromCache = false; if (mergeWorkList != null) { - MapOperator mergeMapOp = null; + AbstractMapOperator mergeMapOp = null; for (BaseWork mergeWork : mergeWorkList) { MapWork mergeMapWork = (MapWork) mergeWork; if (mergeMapWork.getVectorMode()) { @@ -261,7 +262,7 @@ public class MapRecordProcessor extends RecordProcessor { initializeMapRecordSources(); mapOp.initializeMapOperator(jconf); if ((mergeMapOpList != null) && mergeMapOpList.isEmpty() == false) { - for (MapOperator mergeMapOp : mergeMapOpList) { + for (AbstractMapOperator mergeMapOp : mergeMapOpList) { jconf.set(Utilities.INPUT_NAME, mergeMapOp.getConf().getName()); mergeMapOp.initializeMapOperator(jconf); } @@ -309,7 +310,7 @@ public class MapRecordProcessor extends RecordProcessor { reader = legacyMRInput.getReader(); } sources[position].init(jconf, mapOp, reader); - for (MapOperator mapOp : mergeMapOpList) { + for (AbstractMapOperator mapOp : mergeMapOpList) { int tag = mapOp.getConf().getTag(); sources[tag] = new MapRecordSource(); String inputName = mapOp.getConf().getName(); @@ -326,7 +327,7 @@ public class MapRecordProcessor extends RecordProcessor { @SuppressWarnings("deprecation") private KeyValueReader getKeyValueReader(Collection<KeyValueReader> keyValueReaders, - MapOperator mapOp) + AbstractMapOperator mapOp) throws Exception { List<KeyValueReader> kvReaderList = new ArrayList<KeyValueReader>(keyValueReaders); // this sets up the map operator contexts correctly @@ -394,7 +395,7 @@ public class MapRecordProcessor extends RecordProcessor { } mapOp.close(abort); if (mergeMapOpList.isEmpty() == false) { - for (MapOperator mergeMapOp : mergeMapOpList) { + for (AbstractMapOperator mergeMapOp : mergeMapOpList) { mergeMapOp.close(abort); } } http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java index b53c933..add7d08 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java @@ -21,7 +21,7 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.ql.exec.MapOperator; +import org.apache.hadoop.hive.ql.exec.AbstractMapOperator; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.io.Writable; @@ -39,11 +39,11 @@ public class MapRecordSource implements RecordSource { public static final Logger LOG = LoggerFactory.getLogger(MapRecordSource.class); private ExecMapperContext execContext = null; - private MapOperator mapOp = null; + private AbstractMapOperator mapOp = null; private KeyValueReader reader = null; private final boolean grouped = false; - void init(JobConf jconf, MapOperator mapOp, KeyValueReader reader) throws IOException { + void init(JobConf jconf, AbstractMapOperator mapOp, KeyValueReader reader) throws IOException { execContext = mapOp.getExecContext(); this.mapOp = mapOp; this.reader = reader; http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index 1f75d07..e966ff1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -413,7 +413,7 @@ public class ReduceRecordSource implements RecordSource { // VectorizedBatchUtil.displayBytes(keyBytes, 0, keyLength)); keyBinarySortableDeserializeToRow.setBytes(keyBytes, 0, keyLength); - keyBinarySortableDeserializeToRow.deserializeByValue(batch, 0); + keyBinarySortableDeserializeToRow.deserialize(batch, 0); for(int i = 0; i < firstValueColumnOffset; i++) { VectorizedBatchUtil.setRepeatingColumn(batch, i); } @@ -431,7 +431,7 @@ public class ReduceRecordSource implements RecordSource { // VectorizedBatchUtil.displayBytes(valueBytes, 0, valueLength)); valueLazyBinaryDeserializeToRow.setBytes(valueBytes, 0, valueLength); - valueLazyBinaryDeserializeToRow.deserializeByValue(batch, rowIdx); + valueLazyBinaryDeserializeToRow.deserialize(batch, rowIdx); } rowIdx++; if (rowIdx >= BATCH_SIZE) { http://git-wip-us.apache.org/repos/asf/hive/blob/d5285d8e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java index 1951569..2bf6ac5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java @@ -45,7 +45,7 @@ public class VectorAppMasterEventOperator extends AppMasterEventOperator { private transient boolean firstBatch; - private transient VectorExtractRowDynBatch vectorExtractRowDynBatch; + private transient VectorExtractRow vectorExtractRow; protected transient Object[] singleRow; @@ -88,16 +88,14 @@ public class VectorAppMasterEventOperator extends AppMasterEventOperator { VectorizedRowBatch batch = (VectorizedRowBatch) data; if (firstBatch) { - vectorExtractRowDynBatch = new VectorExtractRowDynBatch(); - vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns()); + vectorExtractRow = new VectorExtractRow(); + vectorExtractRow.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns()); - singleRow = new Object[vectorExtractRowDynBatch.getCount()]; + singleRow = new Object[vectorExtractRow.getCount()]; firstBatch = false; } - vectorExtractRowDynBatch.setBatchOnEntry(batch); - ObjectInspector rowInspector = inputObjInspectors[0]; try { Writable writableRow; @@ -105,7 +103,7 @@ public class VectorAppMasterEventOperator extends AppMasterEventOperator { int selected[] = batch.selected; for (int logical = 0 ; logical < batch.size; logical++) { int batchIndex = selected[logical]; - vectorExtractRowDynBatch.extractRow(batchIndex, singleRow); + vectorExtractRow.extractRow(batch, batchIndex, singleRow); writableRow = serializer.serialize(singleRow, rowInspector); writableRow.write(buffer); if (buffer.getLength() > MAX_SIZE) { @@ -117,7 +115,7 @@ public class VectorAppMasterEventOperator extends AppMasterEventOperator { } } else { for (int batchIndex = 0 ; batchIndex < batch.size; batchIndex++) { - vectorExtractRowDynBatch.extractRow(batchIndex, singleRow); + vectorExtractRow.extractRow(batch, batchIndex, singleRow); writableRow = serializer.serialize(singleRow, rowInspector); writableRow.write(buffer); if (buffer.getLength() > MAX_SIZE) { @@ -133,7 +131,5 @@ public class VectorAppMasterEventOperator extends AppMasterEventOperator { } forward(data, rowInspector); - - vectorExtractRowDynBatch.forgetBatchOnExit(); } }
