[
https://issues.apache.org/jira/browse/HIVE-28087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Simhadri Govindappa updated HIVE-28087:
---------------------------------------
Description:
Insert into partitioned table fails with the following error if the data is not
clustered.
*Using cluster by clause it succeeds :*
{noformat}
0: jdbc:hive2://localhost:10001/> insert into table partition_transform_4
select t, ts from t1 cluster by ts;
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING
FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0
0 0
Reducer 2 ...... container SUCCEEDED 1 1 0 0
0 0
----------------------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 9.47 s
----------------------------------------------------------------------------------------------
INFO : Starting task [Stage-2:DEPENDENCY_COLLECTION] in serial mode
INFO : Starting task [Stage-0:MOVE] in serial mode
INFO : Completed executing
command(queryId=root_20240222123244_0c448b32-4fd9-420d-be31-e39e2972af82);
Time taken: 10.534 seconds
100 rows affected (10.696 seconds){noformat}
*Without cluster By it fails:*
{noformat}
0: jdbc:hive2://localhost:10001/> insert into table partition_transform_4
select t, ts from t1;
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING
FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0
0 0
Reducer 2 container RUNNING 1 0 1 0
2 0
----------------------------------------------------------------------------------------------
VERTICES: 01/02 [=============>>-------------] 50% ELAPSED TIME: 9.53 s
----------------------------------------------------------------------------------------------
Caused by: java.lang.IllegalStateException: Incoming records violate the writer
assumption that records are clustered by spec and by partition within each
spec. Either cluster the incoming records or switch to fanout writers.
Encountered records that belong to already closed files:
partition 'ts_month=2027-03' in spec [
1000: ts_month: month(2)
]
at org.apache.iceberg.io.ClusteredWriter.write(ClusteredWriter.java:96)
at
org.apache.iceberg.io.ClusteredDataWriter.write(ClusteredDataWriter.java:31)
at
org.apache.iceberg.mr.hive.writer.HiveIcebergRecordWriter.write(HiveIcebergRecordWriter.java:53)
at
org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:1181)
at
org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator.process(VectorFileSinkOperator.java:111)
at
org.apache.hadoop.hive.ql.exec.Operator.vectorForward(Operator.java:919)
at
org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator.process(VectorSelectOperator.java:158)
at
org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource.processVectorGroup(ReduceRecordSource.java:502)
... 20 more{noformat}
A simple repro, using the attached csv file:
[^query-hive-377.csv]
{noformat}
create database t3;
use t3;
create table vector1k(
t int,
si int,
i int,
b bigint,
f float,
d double,
dc decimal(38,18),
bo boolean,
s string,
s2 string,
ts timestamp,
ts2 timestamp,
dt date)
row format delimited fields terminated by ',';
load data local inpath "/query-hive-377.csv" OVERWRITE into table vector1k;
select * from vector1k; create table vectortab10k(
t int,
si int,
i int,
b bigint,
f float,
d double,
dc decimal(38,18),
bo boolean,
s string,
s2 string,
ts timestamp,
ts2 timestamp,
dt date)
stored by iceberg
stored as orc;
insert into vectortab10k select * from vector1k;
select count(*) from vectortab10k ;
create table partition_transform_4(t int, ts timestamp) partitioned by
spec(month(ts)) stored by iceberg;
insert into table partition_transform_4 select t, ts from vectortab10k ;
{noformat}
was:
Insert into partitioned table fails with the following error if the data is not
clustered.
*Using cluster by clause it succeeds :*
{noformat}
0: jdbc:hive2://localhost:10001/> insert into table partition_transform_4
select t, ts from t1 cluster by ts;
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING
FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0
0 0
Reducer 2 ...... container SUCCEEDED 1 1 0 0
0 0
----------------------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 9.47 s
----------------------------------------------------------------------------------------------
INFO : Starting task [Stage-2:DEPENDENCY_COLLECTION] in serial mode
INFO : Starting task [Stage-0:MOVE] in serial mode
INFO : Completed executing
command(queryId=root_20240222123244_0c448b32-4fd9-420d-be31-e39e2972af82);
Time taken: 10.534 seconds
100 rows affected (10.696 seconds){noformat}
*Without cluster By it fails:*
{noformat}
0: jdbc:hive2://localhost:10001/> insert into table partition_transform_4
select t, ts from t1;
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING
FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0
0 0
Reducer 2 container RUNNING 1 0 1 0
2 0
----------------------------------------------------------------------------------------------
VERTICES: 01/02 [=============>>-------------] 50% ELAPSED TIME: 9.53 s
----------------------------------------------------------------------------------------------
Caused by: java.lang.IllegalStateException: Incoming records violate the writer
assumption that records are clustered by spec and by partition within each
spec. Either cluster the incoming records or switch to fanout writers.
Encountered records that belong to already closed files:
partition 'ts_month=2027-03' in spec [
1000: ts_month: month(2)
]
at org.apache.iceberg.io.ClusteredWriter.write(ClusteredWriter.java:96)
at
org.apache.iceberg.io.ClusteredDataWriter.write(ClusteredDataWriter.java:31)
at
org.apache.iceberg.mr.hive.writer.HiveIcebergRecordWriter.write(HiveIcebergRecordWriter.java:53)
at
org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:1181)
at
org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator.process(VectorFileSinkOperator.java:111)
at
org.apache.hadoop.hive.ql.exec.Operator.vectorForward(Operator.java:919)
at
org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator.process(VectorSelectOperator.java:158)
at
org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource.processVectorGroup(ReduceRecordSource.java:502)
... 20 more{noformat}
A simple repro, using the attached csv file:
[^query-hive-377.csv]
{noformat}
create database t3;
use t3;
create table vector1k(
t int,
si int,
i int,
b bigint,
f float,
d double,
dc decimal(38,18),
bo boolean,
s string,
s2 string,
ts timestamp,
ts2 timestamp,
dt date)
row format delimited fields terminated by ',';
load data local inpath "/query-hive-377.csv" OVERWRITE into table vector1k;
select * from vector1k; create table vectortab10k(
t int,
si int,
i int,
b bigint,
f float,
d double,
dc decimal(38,18),
bo boolean,
s string,
s2 string,
ts timestamp,
ts2 timestamp,
dt date)
stored by iceberg
stored as orc;
insert into vectortab10k select * from vector1k;select count(*) from
vectortab10k limit 10;
create table partition_transform_4(t int, ts timestamp) partitioned by
spec(month(ts)) stored by iceberg;
insert into table partition_transform_4 select t, ts from vectortab10k ;
{noformat}
> Hive Iceberg: Insert into partitioned table fails if the data is not
> clustered
> -------------------------------------------------------------------------------
>
> Key: HIVE-28087
> URL: https://issues.apache.org/jira/browse/HIVE-28087
> Project: Hive
> Issue Type: Task
> Reporter: Simhadri Govindappa
> Assignee: Simhadri Govindappa
> Priority: Major
> Attachments: query-hive-377.csv
>
>
> Insert into partitioned table fails with the following error if the data is
> not clustered.
> *Using cluster by clause it succeeds :*
> {noformat}
> 0: jdbc:hive2://localhost:10001/> insert into table partition_transform_4
> select t, ts from t1 cluster by ts;
> ----------------------------------------------------------------------------------------------
> VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING
> FAILED KILLED
> ----------------------------------------------------------------------------------------------
> Map 1 .......... container SUCCEEDED 1 1 0 0
> 0 0
> Reducer 2 ...... container SUCCEEDED 1 1 0 0
> 0 0
> ----------------------------------------------------------------------------------------------
> VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 9.47 s
> ----------------------------------------------------------------------------------------------
> INFO : Starting task [Stage-2:DEPENDENCY_COLLECTION] in serial mode
> INFO : Starting task [Stage-0:MOVE] in serial mode
> INFO : Completed executing
> command(queryId=root_20240222123244_0c448b32-4fd9-420d-be31-e39e2972af82);
> Time taken: 10.534 seconds
> 100 rows affected (10.696 seconds){noformat}
>
> *Without cluster By it fails:*
> {noformat}
> 0: jdbc:hive2://localhost:10001/> insert into table partition_transform_4
> select t, ts from t1;
> ----------------------------------------------------------------------------------------------
> VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING
> FAILED KILLED
> ----------------------------------------------------------------------------------------------
> Map 1 .......... container SUCCEEDED 1 1 0 0
> 0 0
> Reducer 2 container RUNNING 1 0 1 0
> 2 0
> ----------------------------------------------------------------------------------------------
> VERTICES: 01/02 [=============>>-------------] 50% ELAPSED TIME: 9.53 s
> ----------------------------------------------------------------------------------------------
> Caused by: java.lang.IllegalStateException: Incoming records violate the
> writer assumption that records are clustered by spec and by partition within
> each spec. Either cluster the incoming records or switch to fanout writers.
> Encountered records that belong to already closed files:
> partition 'ts_month=2027-03' in spec [
> 1000: ts_month: month(2)
> ]
> at org.apache.iceberg.io.ClusteredWriter.write(ClusteredWriter.java:96)
> at
> org.apache.iceberg.io.ClusteredDataWriter.write(ClusteredDataWriter.java:31)
> at
> org.apache.iceberg.mr.hive.writer.HiveIcebergRecordWriter.write(HiveIcebergRecordWriter.java:53)
> at
> org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:1181)
> at
> org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator.process(VectorFileSinkOperator.java:111)
> at
> org.apache.hadoop.hive.ql.exec.Operator.vectorForward(Operator.java:919)
> at
> org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator.process(VectorSelectOperator.java:158)
> at
> org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource.processVectorGroup(ReduceRecordSource.java:502)
> ... 20 more{noformat}
>
>
> A simple repro, using the attached csv file:
> [^query-hive-377.csv]
> {noformat}
> create database t3;
> use t3;
> create table vector1k(
> t int,
> si int,
> i int,
> b bigint,
> f float,
> d double,
> dc decimal(38,18),
> bo boolean,
> s string,
> s2 string,
> ts timestamp,
> ts2 timestamp,
> dt date)
> row format delimited fields terminated by ',';
> load data local inpath "/query-hive-377.csv" OVERWRITE into table vector1k;
> select * from vector1k; create table vectortab10k(
> t int,
> si int,
> i int,
> b bigint,
> f float,
> d double,
> dc decimal(38,18),
> bo boolean,
> s string,
> s2 string,
> ts timestamp,
> ts2 timestamp,
> dt date)
> stored by iceberg
> stored as orc;
>
> insert into vectortab10k select * from vector1k;
> select count(*) from vectortab10k ;
> create table partition_transform_4(t int, ts timestamp) partitioned by
> spec(month(ts)) stored by iceberg;
> insert into table partition_transform_4 select t, ts from vectortab10k ;
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)