HIVE-18749: Need to replace transactionId with writeId in RecordIdentifier and other relevant contexts (Sankar Hariappan, reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8f93ca0b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8f93ca0b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8f93ca0b Branch: refs/heads/master Commit: 8f93ca0b5041bc988e5d773523a65d3ef70c19bb Parents: 8d88cfa Author: Sankar Hariappan <sank...@apache.org> Authored: Tue Mar 6 11:19:10 2018 +0530 Committer: Sankar Hariappan <sank...@apache.org> Committed: Tue Mar 6 11:19:10 2018 +0530 ---------------------------------------------------------------------- .../apache/hadoop/hive/common/JavaUtils.java | 4 +- .../hive/hcatalog/streaming/HiveEndPoint.java | 2 +- .../hive/hcatalog/streaming/mutate/package.html | 4 +- .../mutate/worker/MutatorCoordinator.java | 4 +- .../hive/hcatalog/streaming/TestStreaming.java | 18 +- .../streaming/mutate/ExampleUseCase.java | 2 +- .../streaming/mutate/StreamingAssert.java | 8 +- .../streaming/mutate/TestMutations.java | 32 +-- .../apache/hadoop/hive/ql/TestAcidOnTez.java | 82 +++--- .../hive/ql/txn/compactor/TestCompactor.java | 4 +- .../org/apache/hadoop/hive/ql/QTestUtil.java | 6 +- .../apache/hadoop/hive/ql/exec/Utilities.java | 52 ++-- .../ql/exec/vector/VectorizedRowBatchCtx.java | 2 +- .../hadoop/hive/ql/io/AcidInputFormat.java | 24 +- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 20 +- .../hadoop/hive/ql/io/RecordIdentifier.java | 28 +- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 4 +- .../hive/ql/io/orc/OrcRawRecordMerger.java | 44 +-- .../io/orc/VectorizedOrcAcidRowBatchReader.java | 216 +++++++------- .../apache/hadoop/hive/ql/metadata/Hive.java | 16 +- .../hive/ql/parse/BaseSemanticAnalyzer.java | 2 +- .../hive/ql/txn/compactor/CompactorMR.java | 2 +- .../apache/hadoop/hive/ql/TestTxnCommands.java | 24 +- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 18 +- .../apache/hadoop/hive/ql/TestTxnLoadData.java | 142 ++++----- .../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 202 ++++++------- .../hive/ql/exec/TestFileSinkOperator.java | 6 +- .../hive/ql/io/orc/TestInputOutputFormat.java | 2 +- .../hive/ql/io/orc/TestOrcRawRecordMerger.java | 4 +- .../TestVectorizedOrcAcidRowBatchReader.java | 36 +-- ql/src/test/queries/clientpositive/row__id.q | 8 +- .../clientpositive/schema_evol_orc_acid_part.q | 2 +- .../schema_evol_orc_acid_part_llap_io.q | 2 +- .../schema_evol_orc_acid_part_update.q | 2 +- .../schema_evol_orc_acid_part_update_llap_io.q | 2 +- .../clientpositive/schema_evol_orc_acid_table.q | 2 +- .../schema_evol_orc_acid_table_llap_io.q | 2 +- .../schema_evol_orc_acid_table_update.q | 2 +- .../schema_evol_orc_acid_table_update_llap_io.q | 2 +- .../schema_evol_orc_acidvec_part.q | 2 +- .../schema_evol_orc_acidvec_part_llap_io.q | 2 +- .../schema_evol_orc_acidvec_part_update.q | 2 +- ...chema_evol_orc_acidvec_part_update_llap_io.q | 2 +- .../schema_evol_orc_acidvec_table.q | 2 +- .../schema_evol_orc_acidvec_table_llap_io.q | 2 +- .../schema_evol_orc_acidvec_table_update.q | 2 +- ...hema_evol_orc_acidvec_table_update_llap_io.q | 2 +- .../invalid_cast_from_binary_1.q.out | 2 +- .../results/clientpositive/acid_subquery.q.out | 2 +- .../clientpositive/llap/acid_no_buckets.q.out | 64 ++--- .../llap/acid_vectorization_original.q.out | 46 +-- .../llap/dynamic_semijoin_reduction_3.q.out | 50 ++-- .../llap/dynpart_sort_optimization_acid.q.out | 72 ++--- .../llap/enforce_constraint_notnull.q.out | 104 +++---- .../results/clientpositive/llap/llap_acid.q.out | 8 +- .../clientpositive/llap/llap_acid_fast.q.out | 8 +- .../clientpositive/llap/llap_partitioned.q.out | 6 +- .../results/clientpositive/llap/mergejoin.q.out | 60 ++-- .../llap/schema_evol_orc_acidvec_part.q.out | 28 +- .../schema_evol_orc_acidvec_part_llap_io.q.out | 28 +- .../llap/schema_evol_orc_acidvec_table.q.out | 28 +- .../schema_evol_orc_acidvec_table_llap_io.q.out | 28 +- .../llap/schema_evol_orc_vec_part.q.out | 18 +- .../schema_evol_orc_vec_part_all_complex.q.out | 6 +- ..._evol_orc_vec_part_all_complex_llap_io.q.out | 6 +- ...schema_evol_orc_vec_part_all_primitive.q.out | 10 +- ...vol_orc_vec_part_all_primitive_llap_io.q.out | 10 +- .../llap/schema_evol_orc_vec_part_llap_io.q.out | 18 +- .../llap/schema_evol_orc_vec_table.q.out | 10 +- .../schema_evol_orc_vec_table_llap_io.q.out | 10 +- .../llap/schema_evol_text_vec_part.q.out | 18 +- .../schema_evol_text_vec_part_all_complex.q.out | 6 +- ...evol_text_vec_part_all_complex_llap_io.q.out | 6 +- ...chema_evol_text_vec_part_all_primitive.q.out | 10 +- ...ol_text_vec_part_all_primitive_llap_io.q.out | 10 +- .../schema_evol_text_vec_part_llap_io.q.out | 18 +- .../llap/schema_evol_text_vec_table.q.out | 10 +- .../schema_evol_text_vec_table_llap_io.q.out | 18 +- .../llap/schema_evol_text_vecrow_part.q.out | 18 +- ...hema_evol_text_vecrow_part_all_complex.q.out | 6 +- ...l_text_vecrow_part_all_complex_llap_io.q.out | 6 +- ...ma_evol_text_vecrow_part_all_primitive.q.out | 10 +- ...text_vecrow_part_all_primitive_llap_io.q.out | 10 +- .../schema_evol_text_vecrow_part_llap_io.q.out | 18 +- .../llap/schema_evol_text_vecrow_table.q.out | 10 +- .../schema_evol_text_vecrow_table_llap_io.q.out | 18 +- .../results/clientpositive/llap/sqlmerge.q.out | 24 +- .../llap/vector_aggregate_9.q.out | 6 +- .../llap/vector_aggregate_without_gby.q.out | 2 +- .../clientpositive/llap/vector_bround.q.out | 2 +- .../llap/vector_case_when_1.q.out | 4 +- .../llap/vector_case_when_2.q.out | 4 +- .../llap/vector_char_varchar_1.q.out | 4 +- .../clientpositive/llap/vector_coalesce_3.q.out | 4 +- .../clientpositive/llap/vector_coalesce_4.q.out | 2 +- .../llap/vector_complex_all.q.out | 28 +- .../clientpositive/llap/vector_date_1.q.out | 14 +- .../clientpositive/llap/vector_decimal_1.q.out | 18 +- .../llap/vector_decimal_10_0.q.out | 4 +- .../clientpositive/llap/vector_decimal_2.q.out | 54 ++-- .../clientpositive/llap/vector_decimal_6.q.out | 10 +- .../llap/vector_decimal_aggregate.q.out | 8 +- .../llap/vector_decimal_cast.q.out | 4 +- .../llap/vector_decimal_expressions.q.out | 4 +- .../llap/vector_decimal_mapjoin.q.out | 24 +- .../llap/vector_decimal_math_funcs.q.out | 4 +- .../llap/vector_decimal_precision.q.out | 4 +- .../llap/vector_decimal_round.q.out | 12 +- .../llap/vector_decimal_round_2.q.out | 8 +- .../llap/vector_decimal_trailing.q.out | 2 +- .../llap/vector_decimal_udf.q.out | 120 ++++---- .../llap/vector_decimal_udf2.q.out | 8 +- .../llap/vector_groupby_grouping_id1.q.out | 12 +- .../llap/vector_groupby_grouping_id2.q.out | 18 +- .../llap/vector_groupby_grouping_id3.q.out | 4 +- .../llap/vector_groupby_grouping_sets1.q.out | 14 +- .../llap/vector_groupby_grouping_sets2.q.out | 6 +- .../vector_groupby_grouping_sets3_dec.q.out | 6 +- .../llap/vector_groupby_grouping_sets4.q.out | 6 +- .../llap/vector_groupby_grouping_sets5.q.out | 6 +- .../llap/vector_groupby_grouping_sets6.q.out | 4 +- .../vector_groupby_grouping_sets_grouping.q.out | 24 +- .../vector_groupby_grouping_sets_limit.q.out | 12 +- .../llap/vector_groupby_grouping_window.q.out | 2 +- .../llap/vector_groupby_rollup1.q.out | 10 +- .../clientpositive/llap/vector_inner_join.q.out | 36 +-- .../llap/vector_leftsemi_mapjoin.q.out | 288 +++++++++---------- .../clientpositive/llap/vector_like_2.q.out | 2 +- .../llap/vector_llap_text_1.q.out | 4 +- .../clientpositive/llap/vector_order_null.q.out | 22 +- .../llap/vector_outer_join0.q.out | 8 +- .../llap/vector_outer_join1.q.out | 14 +- .../llap/vector_outer_join2.q.out | 6 +- .../llap/vector_outer_reference_windowed.q.out | 32 +-- .../clientpositive/llap/vector_ptf_1.q.out | 2 +- .../llap/vector_ptf_part_simple.q.out | 60 ++-- .../vector_reduce_groupby_duplicate_cols.q.out | 2 +- .../llap/vector_reuse_scratchcols.q.out | 4 +- .../clientpositive/llap/vector_udf1.q.out | 56 ++-- .../llap/vector_udf_adaptor_1.q.out | 8 +- .../clientpositive/llap/vector_windowing.q.out | 94 +++--- .../llap/vector_windowing_expressions.q.out | 20 +- .../llap/vector_windowing_gby.q.out | 4 +- .../llap/vector_windowing_gby2.q.out | 10 +- .../vector_windowing_multipartitioning.q.out | 12 +- .../llap/vector_windowing_navfn.q.out | 22 +- .../llap/vector_windowing_order_null.q.out | 16 +- .../vector_windowing_range_multiorder.q.out | 22 +- .../llap/vector_windowing_rank.q.out | 20 +- .../llap/vector_windowing_streaming.q.out | 8 +- .../llap/vector_windowing_windowspec.q.out | 22 +- .../llap/vector_windowing_windowspec4.q.out | 2 +- .../clientpositive/llap/vectorization_0.q.out | 14 +- .../clientpositive/llap/vectorization_1.q.out | 2 +- .../clientpositive/llap/vectorization_10.q.out | 2 +- .../clientpositive/llap/vectorization_11.q.out | 2 +- .../clientpositive/llap/vectorization_12.q.out | 2 +- .../clientpositive/llap/vectorization_13.q.out | 2 +- .../clientpositive/llap/vectorization_14.q.out | 2 +- .../clientpositive/llap/vectorization_15.q.out | 2 +- .../clientpositive/llap/vectorization_16.q.out | 2 +- .../clientpositive/llap/vectorization_17.q.out | 2 +- .../clientpositive/llap/vectorization_2.q.out | 2 +- .../clientpositive/llap/vectorization_3.q.out | 2 +- .../clientpositive/llap/vectorization_4.q.out | 2 +- .../clientpositive/llap/vectorization_5.q.out | 2 +- .../clientpositive/llap/vectorization_6.q.out | 2 +- .../clientpositive/llap/vectorization_7.q.out | 2 +- .../clientpositive/llap/vectorization_8.q.out | 2 +- .../clientpositive/llap/vectorization_9.q.out | 2 +- .../llap/vectorization_nested_udf.q.out | 2 +- .../clientpositive/llap/vectorized_case.q.out | 20 +- .../clientpositive/llap/vectorized_casts.q.out | 2 +- .../llap/vectorized_mapjoin3.q.out | 12 +- .../clientpositive/llap/vectorized_ptf.q.out | 50 ++-- .../llap/vectorized_timestamp.q.out | 8 +- .../test/results/clientpositive/llap_acid.q.out | 6 +- .../results/clientpositive/llap_acid_fast.q.out | 6 +- .../test/results/clientpositive/masking_7.q.out | 12 +- .../test/results/clientpositive/masking_8.q.out | 6 +- .../test/results/clientpositive/masking_9.q.out | 2 +- .../masking_acid_no_masking.q.out | 2 +- .../test/results/clientpositive/mergejoin.q.out | 4 +- .../parquet_vectorization_0.q.out | 14 +- .../parquet_vectorization_1.q.out | 2 +- .../parquet_vectorization_10.q.out | 2 +- .../parquet_vectorization_11.q.out | 2 +- .../parquet_vectorization_12.q.out | 2 +- .../parquet_vectorization_13.q.out | 2 +- .../parquet_vectorization_14.q.out | 2 +- .../parquet_vectorization_15.q.out | 2 +- .../parquet_vectorization_16.q.out | 2 +- .../parquet_vectorization_17.q.out | 2 +- .../parquet_vectorization_2.q.out | 2 +- .../parquet_vectorization_3.q.out | 2 +- .../parquet_vectorization_4.q.out | 2 +- .../parquet_vectorization_5.q.out | 2 +- .../parquet_vectorization_6.q.out | 2 +- .../parquet_vectorization_7.q.out | 2 +- .../parquet_vectorization_8.q.out | 2 +- .../parquet_vectorization_9.q.out | 2 +- .../parquet_vectorization_limit.q.out | 10 +- .../test/results/clientpositive/row__id.q.out | 22 +- .../spark/parquet_vectorization_0.q.out | 14 +- .../spark/parquet_vectorization_1.q.out | 2 +- .../spark/parquet_vectorization_10.q.out | 2 +- .../spark/parquet_vectorization_11.q.out | 2 +- .../spark/parquet_vectorization_12.q.out | 2 +- .../spark/parquet_vectorization_13.q.out | 2 +- .../spark/parquet_vectorization_14.q.out | 2 +- .../spark/parquet_vectorization_15.q.out | 2 +- .../spark/parquet_vectorization_16.q.out | 2 +- .../spark/parquet_vectorization_17.q.out | 2 +- .../spark/parquet_vectorization_2.q.out | 2 +- .../spark/parquet_vectorization_3.q.out | 2 +- .../spark/parquet_vectorization_4.q.out | 2 +- .../spark/parquet_vectorization_5.q.out | 2 +- .../spark/parquet_vectorization_6.q.out | 2 +- .../spark/parquet_vectorization_7.q.out | 2 +- .../spark/parquet_vectorization_8.q.out | 2 +- .../spark/parquet_vectorization_9.q.out | 2 +- .../spark/parquet_vectorization_limit.q.out | 10 +- ...k_vectorized_dynamic_partition_pruning.q.out | 242 ++++++++-------- .../spark/vector_decimal_aggregate.q.out | 8 +- .../spark/vector_decimal_mapjoin.q.out | 24 +- .../spark/vector_inner_join.q.out | 36 +-- .../spark/vector_outer_join0.q.out | 8 +- .../spark/vector_outer_join1.q.out | 14 +- .../spark/vector_outer_join2.q.out | 6 +- .../clientpositive/spark/vectorization_0.q.out | 14 +- .../clientpositive/spark/vectorization_1.q.out | 2 +- .../clientpositive/spark/vectorization_10.q.out | 2 +- .../clientpositive/spark/vectorization_11.q.out | 2 +- .../clientpositive/spark/vectorization_12.q.out | 2 +- .../clientpositive/spark/vectorization_13.q.out | 2 +- .../clientpositive/spark/vectorization_14.q.out | 2 +- .../clientpositive/spark/vectorization_15.q.out | 2 +- .../clientpositive/spark/vectorization_16.q.out | 2 +- .../clientpositive/spark/vectorization_17.q.out | 2 +- .../clientpositive/spark/vectorization_2.q.out | 2 +- .../clientpositive/spark/vectorization_3.q.out | 2 +- .../clientpositive/spark/vectorization_4.q.out | 2 +- .../clientpositive/spark/vectorization_5.q.out | 2 +- .../clientpositive/spark/vectorization_6.q.out | 2 +- .../clientpositive/spark/vectorization_9.q.out | 2 +- .../spark/vectorization_nested_udf.q.out | 2 +- .../clientpositive/spark/vectorized_case.q.out | 20 +- .../clientpositive/spark/vectorized_ptf.q.out | 50 ++-- .../tez/acid_vectorization_original_tez.q.out | 58 ++-- .../tez/vectorization_limit.q.out | 10 +- .../clientpositive/vector_aggregate_9.q.out | 6 +- .../vector_aggregate_without_gby.q.out | 2 +- .../results/clientpositive/vector_bround.q.out | 2 +- .../clientpositive/vector_case_when_1.q.out | 4 +- .../clientpositive/vector_case_when_2.q.out | 4 +- .../clientpositive/vector_coalesce_3.q.out | 2 +- .../clientpositive/vector_coalesce_4.q.out | 2 +- .../results/clientpositive/vector_date_1.q.out | 14 +- .../clientpositive/vector_decimal_1.q.out | 18 +- .../clientpositive/vector_decimal_10_0.q.out | 4 +- .../clientpositive/vector_decimal_6.q.out | 6 +- .../vector_decimal_aggregate.q.out | 8 +- .../clientpositive/vector_decimal_cast.q.out | 4 +- .../vector_decimal_expressions.q.out | 4 +- .../clientpositive/vector_decimal_mapjoin.q.out | 12 +- .../vector_decimal_math_funcs.q.out | 4 +- .../vector_decimal_precision.q.out | 4 +- .../clientpositive/vector_decimal_round.q.out | 12 +- .../clientpositive/vector_decimal_round_2.q.out | 8 +- .../vector_decimal_trailing.q.out | 2 +- .../clientpositive/vector_decimal_udf2.q.out | 8 +- .../clientpositive/vector_outer_join0.q.out | 4 +- .../clientpositive/vector_outer_join1.q.out | 6 +- .../clientpositive/vector_outer_join2.q.out | 2 +- .../clientpositive/vector_outer_join3.q.out | 6 +- .../clientpositive/vector_outer_join4.q.out | 6 +- .../clientpositive/vector_outer_join6.q.out | 4 +- .../vector_outer_join_no_keys.q.out | 4 +- .../vector_reduce_groupby_duplicate_cols.q.out | 2 +- .../vector_tablesample_rows.q.out | 4 +- .../clientpositive/vectorization_1.q.out | 2 +- .../clientpositive/vectorization_10.q.out | 2 +- .../clientpositive/vectorization_11.q.out | 2 +- .../clientpositive/vectorization_12.q.out | 2 +- .../clientpositive/vectorization_13.q.out | 2 +- .../clientpositive/vectorization_14.q.out | 2 +- .../clientpositive/vectorization_15.q.out | 2 +- .../clientpositive/vectorization_16.q.out | 2 +- .../clientpositive/vectorization_17.q.out | 2 +- .../clientpositive/vectorization_2.q.out | 2 +- .../clientpositive/vectorization_3.q.out | 2 +- .../clientpositive/vectorization_4.q.out | 2 +- .../clientpositive/vectorization_5.q.out | 2 +- .../clientpositive/vectorization_6.q.out | 2 +- .../clientpositive/vectorization_7.q.out | 2 +- .../clientpositive/vectorization_8.q.out | 2 +- .../clientpositive/vectorization_9.q.out | 2 +- .../clientpositive/vectorization_limit.q.out | 10 +- .../vectorization_nested_udf.q.out | 2 +- .../clientpositive/vectorized_case.q.out | 20 +- .../clientpositive/vectorized_casts.q.out | 2 +- .../vectorized_distinct_gby.q.out | 4 +- .../clientpositive/vectorized_mapjoin3.q.out | 6 +- .../clientpositive/vectorized_timestamp.q.out | 8 +- .../hive/common/ValidCompactorWriteIdList.java | 2 +- .../hadoop/hive/common/ValidWriteIdList.java | 4 +- 306 files changed, 2131 insertions(+), 2131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java index de0c283..75c07b4 100644 --- a/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/JavaUtils.java @@ -171,7 +171,7 @@ public final class JavaUtils { String fileName = file.getName(); String[] parts = fileName.split("_", 4); // e.g. delta_0000001_0000001_0000 or base_0000022 if (parts.length < 2 || !(DELTA_PREFIX.equals(parts[0]) || BASE_PREFIX.equals(parts[0]))) { - LOG.debug("Cannot extract transaction ID for a MM table: " + file + LOG.debug("Cannot extract write ID for a MM table: " + file + " (" + Arrays.toString(parts) + ")"); return null; } @@ -179,7 +179,7 @@ public final class JavaUtils { try { writeId = Long.parseLong(parts[1]); } catch (NumberFormatException ex) { - LOG.debug("Cannot extract transaction ID for a MM table: " + file + LOG.debug("Cannot extract write ID for a MM table: " + file + "; parsing " + parts[1] + " got " + ex.getMessage()); return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index 90731dc..6d248ea 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -973,7 +973,7 @@ public class HiveEndPoint { msClient.rollbackTxn(txnToWriteIds.get(currentTxnIndex).getTxnId()); txnStatus[currentTxnIndex] = TxnState.ABORTED; } - currentTxnIndex--;//since the loop left it == txnId.size() + currentTxnIndex--;//since the loop left it == txnToWriteIds.size() } else { if (getCurrentTxnId() > 0) { http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html index 72ce6b1..d133c46 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/package.html @@ -396,7 +396,7 @@ unpartitioned table you should simply pass an empty list as the partition value. For inserts specifically, only the bucket id will be extracted from the <code>RecordIdentifier</code> -, the transactionId and rowId will be ignored and replaced by +, the writeId and rowId will be ignored and replaced by appropriate values in the <code>RecordUpdater</code> . Additionally, in the case of deletes, everything but the @@ -409,7 +409,7 @@ submit the original record. <b>Caution:</b> As mentioned previously, mutations must arrive in specific order for the resultant table data to be consistent. Coordinators will verify a naturally ordered sequence of -(lastTransactionId, rowId) and will throw an exception if this sequence +(writeId, rowId) and will throw an exception if this sequence is broken. This exception should almost certainly be escalated so that the transaction is aborted. This, along with the correct ordering of the data, is the responsibility of the client using the API. http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java index 5e804d7..ad14c72 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/worker/MutatorCoordinator.java @@ -42,14 +42,14 @@ import org.slf4j.LoggerFactory; * Orchestrates the application of an ordered sequence of mutation events to a given ACID table. Events must be grouped * by partition, then bucket and ordered by origTxnId, then rowId. Ordering is enforced by the {@link SequenceValidator} * and grouping is by the {@link GroupingValidator}. An acid delta file is created for each combination partition, and - * bucket id (a single transaction id is implied). Once a delta file has been closed it cannot be reopened. Therefore + * bucket id (a single write id is implied). Once a delta file has been closed it cannot be reopened. Therefore * care is needed as to group the data correctly otherwise failures will occur if a delta belonging to group has been * previously closed. The {@link MutatorCoordinator} will seamlessly handle transitions between groups, creating and * closing {@link Mutator Mutators} as needed to write to the appropriate partition and bucket. New partitions will be * created in the meta store if {@link AcidTable#createPartitions()} is set. * <p/> * {@link #insert(List, Object) Insert} events must be artificially assigned appropriate bucket ids in the preceding - * grouping phase so that they are grouped correctly. Note that any transaction id or row id assigned to the + * grouping phase so that they are grouped correctly. Note that any write id or row id assigned to the * {@link RecordIdentifier RecordIdentifier} of such events will be ignored by both the coordinator and the underlying * {@link RecordUpdater}. */ http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 805fddb..a9ab90b 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -375,15 +375,15 @@ public class TestStreaming { Assert.assertEquals("", 0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID"); - Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar")); + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar")); Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/delta_0000001_0000001_0000/bucket_00000")); - Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2")); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\ta1\tb2")); Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000")); - Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4")); + Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4")); Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000")); - Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6")); + Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6")); Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000")); - Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8")); + Assert.assertTrue(rs.get(4), rs.get(4).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\ta7\tb8")); Assert.assertTrue(rs.get(4), rs.get(4).endsWith("streamingnobuckets/delta_0000002_0000003/bucket_00000")); queryTable(driver, "update default.streamingnobuckets set a=0, b=0 where a='a7'"); @@ -399,13 +399,13 @@ public class TestStreaming { runWorker(conf); rs = queryTable(driver,"select ROW__ID, a, b, INPUT__FILE__NAME from default.streamingnobuckets order by ROW__ID"); - Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar")); + Assert.assertTrue(rs.get(0), rs.get(0).startsWith("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\tfoo\tbar")); Assert.assertTrue(rs.get(0), rs.get(0).endsWith("streamingnobuckets/base_0000005/bucket_00000")); - Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4")); + Assert.assertTrue(rs.get(1), rs.get(1).startsWith("{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\ta3\tb4")); Assert.assertTrue(rs.get(1), rs.get(1).endsWith("streamingnobuckets/base_0000005/bucket_00000")); - Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6")); + Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\ta5\tb6")); Assert.assertTrue(rs.get(2), rs.get(2).endsWith("streamingnobuckets/base_0000005/bucket_00000")); - Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":4,\"bucketid\":536870912,\"rowid\":0}\t0\t0")); + Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t0\t0")); Assert.assertTrue(rs.get(3), rs.get(3).endsWith("streamingnobuckets/base_0000005/bucket_00000")); } http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ExampleUseCase.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ExampleUseCase.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ExampleUseCase.java index 8f37c83..d38950e 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ExampleUseCase.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/ExampleUseCase.java @@ -70,7 +70,7 @@ public class ExampleUseCase { // DATA SHOULD GET SORTED BY YOUR ETL/MERGE PROCESS HERE // // Group the data by (partitionValues, ROW__ID.bucketId) - // Order the groups by (ROW__ID.lastTransactionId, ROW__ID.rowId) + // Order the groups by (ROW__ID.writeId, ROW__ID.rowId) // -------------------------------------------------------------- // One of these runs at the output of each reducer http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java index 2aa8674..0edf1cd 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java @@ -108,18 +108,18 @@ public class StreamingAssert { assertExpectedFileCount(0); } - public void assertMinTransactionId(long expectedMinTransactionId) { + public void assertMinWriteId(long expectedMinWriteId) { if (currentDeltas.isEmpty()) { throw new AssertionError("No data"); } - assertEquals(expectedMinTransactionId, min); + assertEquals(expectedMinWriteId, min); } - public void assertMaxTransactionId(long expectedMaxTransactionId) { + public void assertMaxWriteId(long expectedMaxWriteId) { if (currentDeltas.isEmpty()) { throw new AssertionError("No data"); } - assertEquals(expectedMaxTransactionId, max); + assertEquals(expectedMaxWriteId, max); } List<Record> readRecords() throws Exception { http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java index 15f84e9..3d008e6 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java @@ -241,8 +241,8 @@ public class TestMutations { transaction.commit(); StreamingAssert streamingAssertions = assertionFactory.newStreamingAssert(table, ASIA_INDIA); - streamingAssertions.assertMinTransactionId(1L); - streamingAssertions.assertMaxTransactionId(1L); + streamingAssertions.assertMinWriteId(1L); + streamingAssertions.assertMaxWriteId(1L); streamingAssertions.assertExpectedFileCount(1); List<Record> readRecords = streamingAssertions.readRecords(); @@ -299,8 +299,8 @@ public class TestMutations { // ASIA_INDIA StreamingAssert streamingAssertions = assertionFactory.newStreamingAssert(table, ASIA_INDIA); - streamingAssertions.assertMinTransactionId(1L); - streamingAssertions.assertMaxTransactionId(1L); + streamingAssertions.assertMinWriteId(1L); + streamingAssertions.assertMaxWriteId(1L); streamingAssertions.assertExpectedFileCount(1); List<Record> readRecords = streamingAssertions.readRecords(); @@ -311,8 +311,8 @@ public class TestMutations { // EUROPE_UK streamingAssertions = assertionFactory.newStreamingAssert(table, EUROPE_UK); - streamingAssertions.assertMinTransactionId(1L); - streamingAssertions.assertMaxTransactionId(1L); + streamingAssertions.assertMinWriteId(1L); + streamingAssertions.assertMaxWriteId(1L); streamingAssertions.assertExpectedFileCount(1); readRecords = streamingAssertions.readRecords(); @@ -323,8 +323,8 @@ public class TestMutations { // EUROPE_FRANCE streamingAssertions = assertionFactory.newStreamingAssert(table, EUROPE_FRANCE); - streamingAssertions.assertMinTransactionId(1L); - streamingAssertions.assertMaxTransactionId(1L); + streamingAssertions.assertMinWriteId(1L); + streamingAssertions.assertMaxWriteId(1L); streamingAssertions.assertExpectedFileCount(1); readRecords = streamingAssertions.readRecords(); @@ -373,8 +373,8 @@ public class TestMutations { transaction.commit(); StreamingAssert streamingAssertions = assertionFactory.newStreamingAssert(table); - streamingAssertions.assertMinTransactionId(1L); - streamingAssertions.assertMaxTransactionId(1L); + streamingAssertions.assertMinWriteId(1L); + streamingAssertions.assertMaxWriteId(1L); streamingAssertions.assertExpectedFileCount(1); List<Record> readRecords = streamingAssertions.readRecords(); @@ -527,8 +527,8 @@ public class TestMutations { assertThat(mutateTransaction.getState(), is(COMMITTED)); StreamingAssert indiaAssertions = assertionFactory.newStreamingAssert(table, ASIA_INDIA); - indiaAssertions.assertMinTransactionId(1L); - indiaAssertions.assertMaxTransactionId(2L); + indiaAssertions.assertMinWriteId(1L); + indiaAssertions.assertMaxWriteId(2L); List<Record> indiaRecords = indiaAssertions.readRecords(2); assertThat(indiaRecords.size(), is(3)); assertThat(indiaRecords.get(0).getRow(), is("{1, Namaste streaming 1}")); @@ -542,8 +542,8 @@ public class TestMutations { encodeBucket(0), 1L))); StreamingAssert ukAssertions = assertionFactory.newStreamingAssert(table, EUROPE_UK); - ukAssertions.assertMinTransactionId(1L); - ukAssertions.assertMaxTransactionId(2L); + ukAssertions.assertMinWriteId(1L); + ukAssertions.assertMaxWriteId(2L); //1 split since mutateTransaction txn just does deletes List<Record> ukRecords = ukAssertions.readRecords(1); assertThat(ukRecords.size(), is(1)); @@ -552,8 +552,8 @@ public class TestMutations { encodeBucket(0), 1L))); StreamingAssert franceAssertions = assertionFactory.newStreamingAssert(table, EUROPE_FRANCE); - franceAssertions.assertMinTransactionId(1L); - franceAssertions.assertMaxTransactionId(2L); + franceAssertions.assertMinWriteId(1L); + franceAssertions.assertMaxWriteId(2L); List<Record> franceRecords = franceAssertions.readRecords(2); assertThat(franceRecords.size(), is(1)); assertThat(franceRecords.get(0).getRow(), is("{6, UPDATED: Bonjour streaming 2}")); http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index 53ae2c0..353b890 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -219,12 +219,12 @@ public class TestAcidOnTez { /* * Expected result 0th entry i the RecordIdentifier + data. 1st entry file before compact*/ String expected[][] = { - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, }; Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size()); //verify data and layout @@ -241,10 +241,10 @@ public class TestAcidOnTez { LOG.warn(s); } String[][] expected2 = { - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000001_0000001_0000/bucket_00000"} + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000001_0000001_0000/bucket_00000"} }; Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size()); //verify data and layout @@ -391,12 +391,12 @@ public class TestAcidOnTez { LOG.warn(s); } String[][] expected = { - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t100\t110\t1", "nonacidpart/p=1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t1", "nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t3\t4\t1", "nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10\t1", "nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8\t1", "nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6\t1", "nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "3/000000_0"} + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t100\t110\t1", "nonacidpart/p=1/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t1", "nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t3\t4\t1", "nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10\t1", "nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8\t1", "nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6\t1", "nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "3/000000_0"} }; Assert.assertEquals("Wrong row count", expected.length, rs.size()); //verify data and layout @@ -453,12 +453,12 @@ public class TestAcidOnTez { /* * Expected result 0th entry is the RecordIdentifier + data. 1st entry file before compact*/ String expected[][] = { - {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000001_0000001_0001/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0001/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":2}\t5\t6", "/delta_0000001_0000001_0001/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000001_0000001_0002/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "/delta_0000001_0000001_0002/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870914,\"rowid\":2}\t5\t6", "/delta_0000001_0000001_0002/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000001_0000001_0001/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0001/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":2}\t5\t6", "/delta_0000001_0000001_0001/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000001_0000001_0002/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "/delta_0000001_0000001_0002/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":2}\t5\t6", "/delta_0000001_0000001_0002/bucket_00000"}, }; Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size()); //verify data and layout @@ -475,10 +475,10 @@ public class TestAcidOnTez { LOG.warn(s); } String[][] expected2 = { - {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000001_0000001_0001/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0001/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000001_0000001_0002/bucket_00000"}, - {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "/delta_0000002_0000002_0000/bucket_00000"} + {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000001_0000001_0001/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0001/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000001_0000001_0002/bucket_00000"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "/delta_0000002_0000002_0000/bucket_00000"} }; Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size()); //verify data and layout @@ -605,11 +605,11 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/h LOG.warn(s); } String[][] expected2 = { - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "warehouse/t/base_-9223372036854775808/bucket_00000"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "warehouse/t/base_-9223372036854775808/bucket_00000"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "warehouse/t/base_-9223372036854775808/bucket_00000"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t7\t8", "warehouse/t/base_-9223372036854775808/bucket_00000"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t9\t10", "warehouse/t/base_-9223372036854775808/bucket_00000"} + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "warehouse/t/base_-9223372036854775808/bucket_00000"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "warehouse/t/base_-9223372036854775808/bucket_00000"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "warehouse/t/base_-9223372036854775808/bucket_00000"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t7\t8", "warehouse/t/base_-9223372036854775808/bucket_00000"}, + {"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t9\t10", "warehouse/t/base_-9223372036854775808/bucket_00000"} }; Assert.assertEquals("Unexpected row count after major compact", expected2.length, rs.size()); for(int i = 0; i < expected2.length; i++) { @@ -660,11 +660,11 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/h } String[][] expected2 = { - {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000001_0000001_0001/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "warehouse/t/delta_0000001_0000001_0001/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "warehouse/t/delta_0000001_0000001_0002/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "warehouse/t/delta_0000001_0000001_0002/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "warehouse/t/delta_0000001_0000001_0003/bucket_00000"} + {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000001_0000001_0001/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "warehouse/t/delta_0000001_0000001_0001/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "warehouse/t/delta_0000001_0000001_0002/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "warehouse/t/delta_0000001_0000001_0002/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "warehouse/t/delta_0000001_0000001_0003/bucket_00000"} }; Assert.assertEquals("Unexpected row count", expected2.length, rs.size()); for(int i = 0; i < expected2.length; i++) { @@ -705,11 +705,11 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/h LOG.warn(s); } String[][] expected2 = { - {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"}, - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t4", "warehouse/t/delta_0000001_0000001_0000/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":2}\t5\t6", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"}, - {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t8", "warehouse/t/delta_0000001_0000001_0000/bucket_00000"}, - {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":1}\t9\t10", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"} + {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t4", "warehouse/t/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t5\t6", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t8", "warehouse/t/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t9\t10", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"} }; Assert.assertEquals("Unexpected row count", expected2.length, rs.size()); for(int i = 0; i < expected2.length; i++) { http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 0410fb0..5966740 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -1376,12 +1376,12 @@ public class TestCompactor { return new long[0]; } @Override - public boolean isValidBase(long txnid) { + public boolean isValidBase(long writeid) { return true; } @Override - public boolean isWriteIdAborted(long txnid) { + public boolean isWriteIdAborted(long writeid) { return true; } http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 07e6eaa..967b105 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -1730,7 +1730,7 @@ public class QTestUtil { }); /** * Pattern to match and (partial) replacement text. - * For example, {"transaction":76,"bucketid":8249877}. We just want to mask 76 but a regex that + * For example, {"writeid":76,"bucketid":8249877}. We just want to mask 76 but a regex that * matches just 76 will match a lot of other things. */ private final static class PatternReplacementPair { @@ -1744,8 +1744,8 @@ public class QTestUtil { private final PatternReplacementPair[] partialPlanMask; { ArrayList<PatternReplacementPair> ppm = new ArrayList<>(); - ppm.add(new PatternReplacementPair(Pattern.compile("\\{\"transactionid\":[1-9][0-9]*,\"bucketid\":"), - "{\"transactionid\":### Masked txnid ###,\"bucketid\":")); + ppm.add(new PatternReplacementPair(Pattern.compile("\\{\"writeid\":[1-9][0-9]*,\"bucketid\":"), + "{\"writeid\":### Masked writeid ###,\"bucketid\":")); ppm.add(new PatternReplacementPair(Pattern.compile("attempt_[0-9]+"), "attempt_#ID#")); ppm.add(new PatternReplacementPair(Pattern.compile("vertex_[0-9_]+"), "vertex_#ID#")); http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index fd84231..804cd78 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1640,7 +1640,7 @@ public final class Utilities { } public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats, - String unionSuffix, int dpLevels, int numBuckets, Configuration hconf, Long txnId, + String unionSuffix, int dpLevels, int numBuckets, Configuration hconf, Long writeId, int stmtId, boolean isMmTable, Set<Path> filesKept, boolean isBaseDir) throws IOException { if (fileStats == null) { return null; @@ -1660,7 +1660,7 @@ public final class Utilities { if (isMmTable) { Path mmDir = parts[i].getPath(); - if (!mmDir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, txnId, txnId, stmtId))) { + if (!mmDir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, writeId, writeId, stmtId))) { throw new IOException("Unexpected non-MM directory name " + mmDir); } @@ -1687,7 +1687,7 @@ public final class Utilities { if (fileStats.length == 0) { return result; } - Path mmDir = extractNonDpMmDir(txnId, stmtId, items, isBaseDir); + Path mmDir = extractNonDpMmDir(writeId, stmtId, items, isBaseDir); taskIDToFile = removeTempOrDuplicateFilesNonMm( fs.listStatus(new Path(mmDir, unionSuffix)), fs); if (filesKept != null && taskIDToFile != null) { @@ -1705,7 +1705,7 @@ public final class Utilities { addFilesToPathSet(taskIDToFile.values(), filesKept); } } else { - Path mmDir = extractNonDpMmDir(txnId, stmtId, items, isBaseDir); + Path mmDir = extractNonDpMmDir(writeId, stmtId, items, isBaseDir); taskIDToFile = removeTempOrDuplicateFilesNonMm(fs.listStatus(mmDir), fs); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); @@ -1717,12 +1717,12 @@ public final class Utilities { return result; } - private static Path extractNonDpMmDir(Long txnId, int stmtId, FileStatus[] items, boolean isBaseDir) throws IOException { + private static Path extractNonDpMmDir(Long writeId, int stmtId, FileStatus[] items, boolean isBaseDir) throws IOException { if (items.length > 1) { throw new IOException("Unexpected directories for non-DP MM: " + Arrays.toString(items)); } Path mmDir = items[0].getPath(); - if (!mmDir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, txnId, txnId, stmtId))) { + if (!mmDir.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, writeId, writeId, stmtId))) { throw new IOException("Unexpected non-MM directory " + mmDir); } Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in MM directory {}", mmDir); @@ -4070,11 +4070,11 @@ public final class Utilities { } public static Path[] getMmDirectoryCandidates(FileSystem fs, Path path, int dpLevels, - int lbLevels, PathFilter filter, long txnId, int stmtId, Configuration conf, + int lbLevels, PathFilter filter, long writeId, int stmtId, Configuration conf, Boolean isBaseDir) throws IOException { int skipLevels = dpLevels + lbLevels; if (filter == null) { - filter = new JavaUtils.IdPathFilter(txnId, stmtId, true, false); + filter = new JavaUtils.IdPathFilter(writeId, stmtId, true, false); } if (skipLevels == 0) { return statusToPath(fs.listStatus(path, filter)); @@ -4087,7 +4087,7 @@ public final class Utilities { || (HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_AVOID_GLOBSTATUS_ON_S3) && isS3(fs))) { return getMmDirectoryCandidatesRecursive(fs, path, skipLevels, filter); } - return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, txnId, stmtId, isBaseDir); + return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, writeId, stmtId, isBaseDir); } private static boolean isS3(FileSystem fs) { @@ -4164,27 +4164,27 @@ public final class Utilities { } private static Path[] getMmDirectoryCandidatesGlobStatus(FileSystem fs, Path path, int skipLevels, - PathFilter filter, long txnId, int stmtId, boolean isBaseDir) throws IOException { + PathFilter filter, long writeId, int stmtId, boolean isBaseDir) throws IOException { StringBuilder sb = new StringBuilder(path.toUri().getPath()); for (int i = 0; i < skipLevels; i++) { sb.append(Path.SEPARATOR).append('*'); } if (stmtId < 0) { // Note: this does not work. - // sb.append(Path.SEPARATOR).append(AcidUtils.deltaSubdir(txnId, txnId)).append("_*"); + // sb.append(Path.SEPARATOR).append(AcidUtils.deltaSubdir(writeId, writeId)).append("_*"); throw new AssertionError("GlobStatus should not be called without a statement ID"); } else { - sb.append(Path.SEPARATOR).append(AcidUtils.baseOrDeltaSubdir(isBaseDir, txnId, txnId, stmtId)); + sb.append(Path.SEPARATOR).append(AcidUtils.baseOrDeltaSubdir(isBaseDir, writeId, writeId, stmtId)); } Path pathPattern = new Path(path, sb.toString()); return statusToPath(fs.globStatus(pathPattern, filter)); } private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manifestDir, - int dpLevels, int lbLevels, JavaUtils.IdPathFilter filter, long txnId, int stmtId, + int dpLevels, int lbLevels, JavaUtils.IdPathFilter filter, long writeId, int stmtId, Configuration conf) throws IOException { Path[] files = getMmDirectoryCandidates( - fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, conf, null); + fs, specPath, dpLevels, lbLevels, filter, writeId, stmtId, conf, null); if (files != null) { for (Path path : files) { Utilities.FILE_OP_LOGGER.info("Deleting {} on failure", path); @@ -4197,12 +4197,12 @@ public final class Utilities { public static void writeMmCommitManifest(List<Path> commitPaths, Path specPath, FileSystem fs, - String taskId, Long txnId, int stmtId, String unionSuffix, boolean isInsertOverwrite) throws HiveException { + String taskId, Long writeId, int stmtId, String unionSuffix, boolean isInsertOverwrite) throws HiveException { if (commitPaths.isEmpty()) { return; } // We assume one FSOP per task (per specPath), so we create it in specPath. - Path manifestPath = getManifestDir(specPath, txnId, stmtId, unionSuffix, isInsertOverwrite); + Path manifestPath = getManifestDir(specPath, writeId, stmtId, unionSuffix, isInsertOverwrite); manifestPath = new Path(manifestPath, taskId + MANIFEST_EXTENSION); Utilities.FILE_OP_LOGGER.info("Writing manifest to {} with {}", manifestPath, commitPaths); try { @@ -4223,9 +4223,9 @@ public final class Utilities { // TODO: we should get rid of isInsertOverwrite here too. private static Path getManifestDir( - Path specPath, long txnId, int stmtId, String unionSuffix, boolean isInsertOverwrite) { + Path specPath, long writeId, int stmtId, String unionSuffix, boolean isInsertOverwrite) { Path manifestPath = new Path(specPath, "_tmp." + - AcidUtils.baseOrDeltaSubdir(isInsertOverwrite, txnId, txnId, stmtId)); + AcidUtils.baseOrDeltaSubdir(isInsertOverwrite, writeId, writeId, stmtId)); return (unionSuffix == null) ? manifestPath : new Path(manifestPath, unionSuffix); } @@ -4242,19 +4242,19 @@ public final class Utilities { } public static void handleMmTableFinalPath(Path specPath, String unionSuffix, Configuration hconf, - boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long txnId, int stmtId, + boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long writeId, int stmtId, Reporter reporter, boolean isMmTable, boolean isMmCtas, boolean isInsertOverwrite) throws IOException, HiveException { FileSystem fs = specPath.getFileSystem(hconf); - Path manifestDir = getManifestDir(specPath, txnId, stmtId, unionSuffix, isInsertOverwrite); + Path manifestDir = getManifestDir(specPath, writeId, stmtId, unionSuffix, isInsertOverwrite); if (!success) { - JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(txnId, stmtId, true); + JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(writeId, stmtId, true); tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels, - filter, txnId, stmtId, hconf); + filter, writeId, stmtId, hconf); return; } - Utilities.FILE_OP_LOGGER.debug("Looking for manifests in: {} ({})", manifestDir, txnId); + Utilities.FILE_OP_LOGGER.debug("Looking for manifests in: {} ({})", manifestDir, writeId); List<Path> manifests = new ArrayList<>(); if (fs.exists(manifestDir)) { FileStatus[] manifestFiles = fs.listStatus(manifestDir); @@ -4273,13 +4273,13 @@ public final class Utilities { } Utilities.FILE_OP_LOGGER.debug("Looking for files in: {}", specPath); - JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(txnId, stmtId, true, false); + JavaUtils.IdPathFilter filter = new JavaUtils.IdPathFilter(writeId, stmtId, true, false); if (isMmCtas && !fs.exists(specPath)) { Utilities.FILE_OP_LOGGER.info("Creating table directory for CTAS with no output at {}", specPath); FileUtils.mkdir(fs, specPath, hconf); } Path[] files = getMmDirectoryCandidates( - fs, specPath, dpLevels, lbLevels, filter, txnId, stmtId, hconf, isInsertOverwrite); + fs, specPath, dpLevels, lbLevels, filter, writeId, stmtId, hconf, isInsertOverwrite); ArrayList<Path> mmDirectories = new ArrayList<>(); if (files != null) { for (Path path : files) { @@ -4339,7 +4339,7 @@ public final class Utilities { finalResults[i] = new PathOnlyFileStatus(mmDirectories.get(i)); } List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(fs, finalResults, - unionSuffix, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, txnId, stmtId, + unionSuffix, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, writeId, stmtId, isMmTable, null, isInsertOverwrite); // create empty buckets if necessary if (!emptyBuckets.isEmpty()) { http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index ff55f50..6588385 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -87,7 +87,7 @@ public class VectorizedRowBatchCtx { /** * A record ID column is a virtual column, so it should be separated from normal data column * processes. A recordIdColumnVector contains RecordIdentifier information in a - * StructColumnVector. It has three LongColumnVectors as its fields; original transaction IDs, + * StructColumnVector. It has three LongColumnVectors as its fields; original write IDs, * bucket IDs, and row IDs. */ private StructColumnVector recordIdColumnVector; http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java index 1ed35b3..41007e2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java @@ -56,9 +56,9 @@ import java.util.List; * online transactions systems. * <p> * The design changes the layout of data within a partition from being in files - * at the top level to having base and delta directories. Each write operation - * will be assigned a sequential global transaction id and each read operation - * will request the list of valid transaction ids. + * at the top level to having base and delta directories. Each write operation in a table + * will be assigned a sequential table write id and each read operation + * will request the list of valid transactions/write ids. * <ul> * <li>Old format - * <pre> @@ -66,28 +66,28 @@ import java.util.List; * </pre></li> * <li>New format - * <pre> - * $partition/base_$tid/$bucket - * delta_$tid_$tid_$stid/$bucket + * $partition/base_$wid/$bucket + * delta_$wid_$wid_$stid/$bucket * </pre></li> * </ul> * <p> * With each new write operation a new delta directory is created with events * that correspond to inserted, updated, or deleted rows. Each of the files is - * stored sorted by the original transaction id (ascending), bucket (ascending), - * row id (ascending), and current transaction id (descending). Thus the files + * stored sorted by the original write id (ascending), bucket (ascending), + * row id (ascending), and current write id (descending). Thus the files * can be merged by advancing through the files in parallel. * The stid is unique id (within the transaction) of the statement that created * this delta file. * <p> * The base files include all transactions from the beginning of time - * (transaction id 0) to the transaction in the directory name. Delta - * directories include transactions (inclusive) between the two transaction ids. + * (write id 0) to the write id in the directory name. Delta + * directories include transactions (inclusive) between the two write ids. * <p> - * Because read operations get the list of valid transactions when they start, + * Because read operations get the list of valid transactions/write ids when they start, * all reads are performed on that snapshot, regardless of any transactions that * are committed afterwards. * <p> - * The base and the delta directories have the transaction ids so that major + * The base and the delta directories have the write ids so that major * (merge all deltas into the base) and minor (merge several deltas together) * compactions can happen while readers continue their processing. * <p> @@ -204,7 +204,7 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE> /** * Get a record reader that provides the user-facing view of the data after * it has been merged together. The key provides information about the - * record's identifier (transaction, bucket, record id). + * record's identifier (write id, bucket, record id). * @param split the split to read * @param options the options to read with * @return a record reader http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 8dc1e8a..ced84b3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -623,7 +623,7 @@ public class AcidUtils { /** * Get the list of obsolete directories. After filtering out bases and - * deltas that are not selected by the valid transaction list, return the + * deltas that are not selected by the valid transaction/write ids list, return the * list of original files, bases, and deltas that have been replaced by * more up to date ones. Not {@code null}. */ @@ -695,7 +695,7 @@ public class AcidUtils { /** * Compactions (Major/Minor) merge deltas/bases but delete of old files * happens in a different process; thus it's possible to have bases/deltas with - * overlapping txnId boundaries. The sort order helps figure out the "best" set of files + * overlapping writeId boundaries. The sort order helps figure out the "best" set of files * to use to get data. * This sorts "wider" delta before "narrower" i.e. delta_5_20 sorts before delta_5_10 (and delta_11_20) */ @@ -718,7 +718,7 @@ public class AcidUtils { /** * We want deltas after minor compaction (w/o statementId) to sort * earlier so that getAcidState() considers compacted files (into larger ones) obsolete - * Before compaction, include deltas with all statementIds for a given txnId + * Before compaction, include deltas with all statementIds for a given writeId * in a {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory} */ if(statementId < parsedDelta.statementId) { @@ -749,9 +749,9 @@ public class AcidUtils { /** * Convert the list of deltas into an equivalent list of begin/end - * transaction id pairs. Assumes {@code deltas} is sorted. + * write id pairs. Assumes {@code deltas} is sorted. * @param deltas - * @return the list of transaction ids to serialize + * @return the list of write ids to serialize */ public static List<AcidInputFormat.DeltaMetaData> serializeDeltas(List<ParsedDelta> deltas) { List<AcidInputFormat.DeltaMetaData> result = new ArrayList<>(deltas.size()); @@ -774,12 +774,12 @@ public class AcidUtils { } /** - * Convert the list of begin/end transaction id pairs to a list of delete delta + * Convert the list of begin/end write id pairs to a list of delete delta * directories. Note that there may be multiple delete_delta files for the exact same txn range starting * with 2.2.x; * see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)} * @param root the root directory - * @param deleteDeltas list of begin/end transaction id pairs + * @param deleteDeltas list of begin/end write id pairs * @return the list of delta paths */ public static Path[] deserializeDeleteDeltas(Path root, final List<AcidInputFormat.DeltaMetaData> deleteDeltas) throws IOException { @@ -879,7 +879,7 @@ public class AcidUtils { * Get the ACID state of the given directory. It finds the minimal set of * base and diff directories. Note that because major compactions don't * preserve the history, we can't use a base directory that includes a - * transaction id that we must exclude. + * write id that we must exclude. * @param directory the partition directory to analyze * @param conf the configuration * @param writeIdList the list of write ids that we are reading @@ -1075,7 +1075,7 @@ public class AcidUtils { * files within the snapshot. * A base produced by Insert Overwrite is different. Logically it's a delta file but one that * causes anything written previously is ignored (hence the overwrite). In this case, base_x - * is visible if txnid:x is committed for current reader. + * is visible if writeid:x is committed for current reader. */ private static boolean isValidBase(long baseWriteId, ValidWriteIdList writeIdList, Path baseDir, FileSystem fs) throws IOException { @@ -1645,7 +1645,7 @@ public class AcidUtils { try { Reader reader = OrcFile.createReader(dataFile, OrcFile.readerOptions(fs.getConf())); /* - acid file would have schema like <op, otid, writerId, rowid, ctid, <f1, ... fn>> so could + acid file would have schema like <op, owid, writerId, rowid, cwid, <f1, ... fn>> so could check it this way once/if OrcRecordUpdater.ACID_KEY_INDEX_NAME is removed TypeDescription schema = reader.getSchema(); List<String> columns = schema.getFieldNames(); http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java index 1f673da..607abfd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java @@ -45,7 +45,7 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> { */ public enum Field { //note the enum names match field names in the struct - transactionId(TypeInfoFactory.longTypeInfo, + writeId(TypeInfoFactory.longTypeInfo, PrimitiveObjectInspectorFactory.javaLongObjectInspector), bucketId(TypeInfoFactory.intTypeInfo, PrimitiveObjectInspectorFactory.javaIntObjectInspector), rowId(TypeInfoFactory.longTypeInfo, PrimitiveObjectInspectorFactory.javaLongObjectInspector); @@ -88,13 +88,13 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> { Arrays.fill(struct, null); return; } - struct[Field.transactionId.ordinal()] = ri.getWriteId(); + struct[Field.writeId.ordinal()] = ri.getWriteId(); struct[Field.bucketId.ordinal()] = ri.getBucketProperty(); struct[Field.rowId.ordinal()] = ri.getRowId(); } } - private long transactionId; + private long writeId; private int bucketId; private long rowId; @@ -102,7 +102,7 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> { } public RecordIdentifier(long writeId, int bucket, long rowId) { - this.transactionId = writeId; + this.writeId = writeId; this.bucketId = bucket; this.rowId = rowId; } @@ -114,7 +114,7 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> { * @param rowId the row id */ public void setValues(long writeId, int bucketId, long rowId) { - this.transactionId = writeId; + this.writeId = writeId; this.bucketId = bucketId; this.rowId = rowId; } @@ -124,7 +124,7 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> { * @param other the object to copy from */ public void set(RecordIdentifier other) { - this.transactionId = other.transactionId; + this.writeId = other.writeId; this.bucketId = other.bucketId; this.rowId = other.rowId; } @@ -138,7 +138,7 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> { * @return the write id */ public long getWriteId() { - return transactionId; + return writeId; } /** @@ -161,8 +161,8 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> { if (other == null) { return -1; } - if (transactionId != other.transactionId) { - return transactionId < other.transactionId ? -1 : 1; + if (writeId != other.writeId) { + return writeId < other.writeId ? -1 : 1; } if (bucketId != other.bucketId) { return bucketId < other.bucketId ? - 1 : 1; @@ -183,14 +183,14 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> { @Override public void write(DataOutput dataOutput) throws IOException { - dataOutput.writeLong(transactionId); + dataOutput.writeLong(writeId); dataOutput.writeInt(bucketId); dataOutput.writeLong(rowId); } @Override public void readFields(DataInput dataInput) throws IOException { - transactionId = dataInput.readLong(); + writeId = dataInput.readLong(); bucketId = dataInput.readInt(); rowId = dataInput.readLong(); } @@ -204,14 +204,14 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> { return false; } RecordIdentifier oth = (RecordIdentifier) other; - return oth.transactionId == transactionId && + return oth.writeId == writeId && oth.bucketId == bucketId && oth.rowId == rowId; } @Override public int hashCode() { int result = 17; - result = 31 * result + (int)(transactionId ^ (transactionId >>> 32)); + result = 31 * result + (int)(writeId ^ (writeId >>> 32)); result = 31 * result + bucketId; result = 31 * result + (int)(rowId ^ (rowId >>> 32)); return result; @@ -223,7 +223,7 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> { BucketCodec.determineVersion(bucketId); String s = "(" + codec.getVersion() + "." + codec.decodeWriterId(bucketId) + "." + codec.decodeStatementId(bucketId) + ")"; - return "{originalWriteId: " + transactionId + ", " + bucketToString() + ", row: " + getRowId() +"}"; + return "{originalWriteId: " + writeId + ", " + bucketToString() + ", row: " + getRowId() +"}"; } protected String bucketToString() { BucketCodec codec = http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index cf0d013..fe109d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -149,8 +149,8 @@ import com.google.protobuf.CodedInputStream; * } * </pre> * Each AcidEvent object corresponds to an update event. The - * originalTransaction, bucket, and rowId are the unique identifier for the row. - * The operation and currentTransaction are the operation and the transaction + * originalWriteId, bucket, and rowId are the unique identifier for the row. + * The operation and currentWriteId are the operation and the table write id within current txn * that added this event. Insert and update events include the entire row, while * delete events have null for row. */ http://git-wip-us.apache.org/repos/asf/hive/blob/8f93ca0b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 4059c53..5655ee9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -131,7 +131,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ } if(isDeleteEvent != oth.isDeleteEvent) { //this is to break a tie if insert + delete of a given row is done within the same - //txn (so that currentTransactionId is the same for both events) and we want the + //txn (so that currentWriteId is the same for both events) and we want the //delete event to sort 1st since it needs to be sent up so that // OrcInputFormat.getReader(InputSplit inputSplit, Options options) can skip it. return isDeleteEvent ? -1 : +1; @@ -330,9 +330,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ final int bucketId; final int bucketProperty; /** - * TransactionId to use when generating synthetic ROW_IDs + * Write Id to use when generating synthetic ROW_IDs */ - final long transactionId; + final long writeId; /** * @param statementId - this should be from delta_x_y_stmtId file name. Imagine 2 load data * statements in 1 txn. The stmtId will be embedded in @@ -344,7 +344,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ this.bucketId = bucketId; assert bucketId >= 0 : "don't support non-bucketed tables yet"; this.bucketProperty = encodeBucketId(conf, bucketId, statementId); - transactionId = mergeOptions.getTransactionId(); + writeId = mergeOptions.getWriteId(); } @Override public final OrcStruct nextRecord() { return nextRecord; @@ -374,9 +374,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ new IntWritable(OrcRecordUpdater.INSERT_OPERATION); nextRecord().setFieldValue(OrcRecordUpdater.OPERATION, operation); nextRecord().setFieldValue(OrcRecordUpdater.CURRENT_WRITEID, - new LongWritable(transactionId)); + new LongWritable(writeId)); nextRecord().setFieldValue(OrcRecordUpdater.ORIGINAL_WRITEID, - new LongWritable(transactionId)); + new LongWritable(writeId)); nextRecord().setFieldValue(OrcRecordUpdater.BUCKET, new IntWritable(bucketProperty)); nextRecord().setFieldValue(OrcRecordUpdater.ROW_ID, @@ -388,17 +388,17 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION)) .set(OrcRecordUpdater.INSERT_OPERATION); ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_WRITEID)) - .set(transactionId); + .set(writeId); ((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET)) .set(bucketProperty); ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_WRITEID)) - .set(transactionId); + .set(writeId); ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID)) .set(nextRowId); nextRecord().setFieldValue(OrcRecordUpdater.ROW, getRecordReader().next(OrcRecordUpdater.getRow(next))); } - key.setValues(transactionId, bucketProperty, nextRowId, transactionId, false); + key.setValues(writeId, bucketProperty, nextRowId, writeId, false); if (getMaxKey() != null && key.compareRow(getMaxKey()) > 0) { if (LOG.isDebugEnabled()) { LOG.debug("key " + key + " > maxkey " + getMaxKey()); @@ -498,7 +498,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ * If this is not the 1st file, set minKey 1 less than the start of current file * (Would not need to set minKey if we knew that there are no delta files) * {@link #advanceToMinKey()} needs this */ - newMinKey = new RecordIdentifier(transactionId, bucketProperty,rowIdOffset - 1); + newMinKey = new RecordIdentifier(writeId, bucketProperty,rowIdOffset - 1); } if (maxKey != null) { maxKey.setRowId(maxKey.getRowId() + rowIdOffset); @@ -511,7 +511,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ * of the file so we want to leave it blank to make sure any insert events in delta * files are included; Conversely, if it's not the last file, set the maxKey so that * events from deltas that don't modify anything in the current split are excluded*/ - newMaxKey = new RecordIdentifier(transactionId, bucketProperty, + newMaxKey = new RecordIdentifier(writeId, bucketProperty, rowIdOffset + reader.getNumberOfRows() - 1); } this.minKey = newMinKey; @@ -800,7 +800,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ private Path baseDir; private boolean isMajorCompaction = false; private boolean isDeleteReader = false; - private long transactionId = 0; + private long writeId = 0; Options copyIndex(int copyIndex) { assert copyIndex >= 0; this.copyIndex = copyIndex; @@ -829,8 +829,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ assert !isCompacting; return this; } - Options transactionId(long transactionId) { - this.transactionId = transactionId; + Options writeId(long writeId) { + this.writeId = writeId; return this; } Options baseDir(Path baseDir) { @@ -876,10 +876,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ * for reading "original" files - i.e. not native acid schema. Default value of 0 is * appropriate for files that existed in a table before it was made transactional. 0 is the * primordial transaction. For non-native files resulting from Load Data command, they - * are located and base_x or delta_x_x and then transactionId == x. + * are located and base_x or delta_x_x and then writeId == x. */ - long getTransactionId() { - return transactionId; + long getWriteId() { + return writeId; } /** @@ -1158,7 +1158,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ static final class TransactionMetaData { final long syntheticWriteId; /** - * folder which determines the transaction id to use in synthetic ROW_IDs + * folder which determines the write id to use in synthetic ROW_IDs */ final Path folder; final int statementId; @@ -1175,7 +1175,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ Path parent = splitPath.getParent(); if(rootPath.equals(parent)) { //the 'isOriginal' file is at the root of the partition (or table) thus it is - //from a pre-acid conversion write and belongs to primordial txnid:0. + //from a pre-acid conversion write and belongs to primordial writeid:0. return new TransactionMetaData(0, parent); } while(parent != null && !rootPath.equals(parent)) { @@ -1199,7 +1199,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ if(parent == null) { //spit is marked isOriginal but it's not an immediate child of a partition nor is it in a //base/ or delta/ - this should never happen - throw new IllegalStateException("Cannot determine transaction id for original file " + throw new IllegalStateException("Cannot determine write id for original file " + splitPath + " in " + rootPath); } //"warehouse/t/HIVE_UNION_SUBDIR_15/000000_0" is a meaningful path for nonAcid2acid @@ -1215,8 +1215,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ * in {@link AcidUtils.Directory#getOriginalFiles()} * @return modified clone of {@code baseOptions} */ - private Options modifyForNonAcidSchemaRead(Options baseOptions, long transactionId, Path rootPath) { - return baseOptions.clone().transactionId(transactionId).rootPath(rootPath); + private Options modifyForNonAcidSchemaRead(Options baseOptions, long writeId, Path rootPath) { + return baseOptions.clone().writeId(writeId).rootPath(rootPath); } /** * This determines the set of {@link ReaderPairAcid} to create for a given delta/.