[
https://issues.apache.org/jira/browse/HIVE-20825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tom Zeng updated HIVE-20825:
----------------------------
Attachment: hive-merge-invalid-orc-repro.log
hive-merge-invalid-orc-repro.hql
> Hive ACID Merge generates invalid ORC files (bucket files 0 or 3 bytes in
> length) causing the "Not a valid ORC file" error
> --------------------------------------------------------------------------------------------------------------------------
>
> Key: HIVE-20825
> URL: https://issues.apache.org/jira/browse/HIVE-20825
> Project: Hive
> Issue Type: Bug
> Components: Hive, ORC, Transactions
> Affects Versions: 2.2.0, 2.3.1, 2.3.2
> Environment: Hive 2.3.x on Amazon EMR 5.8.0 to 5.18.0
> Reporter: Tom Zeng
> Priority: Major
> Attachments: hive-merge-invalid-orc-repro.hql,
> hive-merge-invalid-orc-repro.log
>
>
> When using Hive ACID Merge (supported with the ORC format) to update/insert
> data, bucket files with 0 byte or 3 bytes (file content is three character:
> ORC) are generated during MERGE INTO operations which finish with no errors.
> Subsequent queries on the base table will get "Not a valid ORC file" error.
>
> The following script can be used to reproduce the issue:
> set hive.auto.convert.join=false;
> set hive.enforce.bucketing=true;
> set hive.exec.dynamic.partition.mode = nonstrict;
> set hive.support.concurrency=true;
> set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
> drop table if exists mergedelta_txt_1;
> drop table if exists mergedelta_txt_2;
> CREATE TABLE mergedelta_txt_1 (
> id_str varchar(12), time_key int, value bigint)
> PARTITIONED BY (date_key int)
> ROW FORMAT DELIMITED
> STORED AS TEXTFILE;
> CREATE TABLE mergedelta_txt_2 (
> id_str varchar(12), time_key int, value bigint)
> PARTITIONED BY (date_key int)
> ROW FORMAT DELIMITED
> STORED AS TEXTFILE;
> INSERT INTO TABLE mergedelta_txt_1
> partition(date_key=20170103)
> VALUES
> ("AB94LIENR0",46700,12345676836978),
> ("AB94LIENR1",46825,12345676836978),
> ("AB94LIENS0",46709,12345676836978),
> ("AB94LIENS1",46834,12345676836978),
> ("AB94LIENT0",46709,12345676836978),
> ("AB94LIENT1",46834,12345676836978),
> ("AB94LIENU0",46718,12345676836978),
> ("AB94LIENU1",46844,12345676836978),
> ("AB94LIENV0",46719,12345676836978),
> ("AB94LIENV1",46844,12345676836978),
> ("AB94LIENW0",46728,12345676836978),
> ("AB94LIENW1",46854,12345676836978),
> ("AB94LIENX0",46728,12345676836978),
> ("AB94LIENX1",46854,12345676836978),
> ("AB94LIENY0",46737,12345676836978),
> ("AB94LIENY1",46863,12345676836978),
> ("AB94LIENZ0",46738,12345676836978),
> ("AB94LIENZ1",46863,12345676836978),
> ("AB94LIERA0",47176,12345676836982),
> ("AB94LIERA1",47302,12345676836982);
> INSERT INTO TABLE mergedelta_txt_2
> partition(date_key=20170103)
> VALUES
> ("AB94LIENT1",46834,12345676836978),
> ("AB94LIENU0",46718,12345676836978),
> ("AB94LIENU1",46844,12345676836978),
> ("AB94LIENV0",46719,12345676836978),
> ("AB94LIENV1",46844,12345676836978),
> ("AB94LIENW0",46728,12345676836978),
> ("AB94LIENW1",46854,12345676836978),
> ("AB94LIENX0",46728,12345676836978),
> ("AB94LIENX1",46854,12345676836978),
> ("AB94LIENY0",46737,12345676836978),
> ("AB94LIENY1",46863,12345676836978),
> ("AB94LIENZ0",46738,12345676836978),
> ("AB94LIENZ1",46863,12345676836978),
> ("AB94LIERA0",47176,12345676836982),
> ("AB94LIERA1",47302,12345676836982),
> ("AB94LIERA2",47418,12345676836982),
> ("AB94LIERB0",47176,12345676836982),
> ("AB94LIERB1",47302,12345676836982),
> ("AB94LIERB2",47418,12345676836982),
> ("AB94LIERC0",47185,12345676836982);
> DROP TABLE IF EXISTS mergebase_1;
> CREATE TABLE mergebase_1 (
> id_str varchar(12) , time_key int , value bigint)
> PARTITIONED BY (date_key int)
> CLUSTERED BY (id_str,time_key) INTO 32 BUCKETS
> STORED AS ORC
> TBLPROPERTIES (
> 'orc.compress'='SNAPPY',
> 'pk_columns'='id_str,date_key,time_key',
> 'NO_AUTO_COMPACTION'='true',
> 'transactional'='true');
> MERGE INTO mergebase_1 AS base
> USING (SELECT *
> FROM (
> SELECT id_str ,time_key ,value, date_key, rank() OVER (PARTITION BY
> id_str,date_key,time_key ORDER BY id_str,date_key,time_key) AS rk
> FROM mergedelta_txt_1
> DISTRIBUTE BY date_key
> ) rankedtbl
> WHERE rankedtbl.rk=1
> ) AS delta
> ON delta.id_str=base.id_str AND delta.date_key=base.date_key AND
> delta.time_key=base.time_key
> WHEN MATCHED THEN UPDATE SET value=delta.value
> WHEN NOT MATCHED THEN INSERT VALUES ( delta.id_str , delta.time_key ,
> delta.value, delta.date_key);
> MERGE INTO mergebase_1 AS base
> USING (SELECT *
> FROM (
> SELECT id_str ,time_key ,value, date_key, rank() OVER (PARTITION BY
> id_str,date_key,time_key ORDER BY id_str,date_key,time_key) AS rk
> FROM mergedelta_txt_2
> DISTRIBUTE BY date_key
> ) rankedtbl
> WHERE rankedtbl.rk=1
> ) AS delta
> ON delta.id_str=base.id_str AND delta.date_key=base.date_key AND
> delta.time_key=base.time_key
> WHEN MATCHED THEN UPDATE SET value=delta.value
> WHEN NOT MATCHED THEN INSERT VALUES ( delta.id_str , delta.time_key ,
> delta.value, delta.date_key);
> select count(*) from mergebase_1;
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)