[
https://issues.apache.org/jira/browse/HIVE-28790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Marta Kuczora updated HIVE-28790:
---------------------------------
Description:
*Steps to reproduce:
{code:java}
set mapreduce.job.reduces=7;
create external table ext(a int) stored as textfile;
insert into table ext values(1),(2),(3),(4),(5),(6),(7), (8), (9), (12);
create table full_acid(a int) stored as orc
tblproperties("transactional"="true");
insert into table full_acid select * from ext where a != 3 and a <=7 group
by a;
insert into table full_acid select * from ext where a>7 group by a;
set mapreduce.job.reduces=1;
delete from full_acid where a in (2, 12);
{code}
The delete will fail with the following exception:
{code}
Caused by: java.lang.ArrayIndexOutOfBoundsException: 6
at
org.apache.hadoop.hive.ql.exec.FileSinkOperator$FSPaths.closeWriters(FileSinkOperator.java:258)
{code}
The problem is in the FileSinkOperator.createDynamicBucket method:
{code}
public int createDynamicBucket(int bucketNum) {
// this assumes all paths are bucket names (which means no lookup is
needed)
int writerOffset = bucketNum;
if (updaters.length <= writerOffset) {
this.updaters = Arrays.copyOf(updaters, writerOffset + 1);
this.outPaths = Arrays.copyOf(outPaths, writerOffset + 1);
this.finalPaths = Arrays.copyOf(finalPaths, writerOffset + 1);
}
if (this.finalPaths[writerOffset] == null) {
if (conf.isDirectInsert()) {
this.outPathsCommitted = Arrays.copyOf(outPathsCommitted,
writerOffset + 1);
this.finalPaths[writerOffset] = buildTmpPath();
this.outPaths[writerOffset] = buildTmpPath();
} else {
// uninitialized bucket
String bucketName =
Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf),
bucketNum);
this.finalPaths[writerOffset] = new Path(bDynParts ? buildTmpPath() :
parent, bucketName);
this.outPaths[writerOffset] = new Path(buildTaskOutputTempPath(),
bucketName);
}
}
return writerOffset;
}
} // class FSPaths
{code}
In the first part the updaters, outPaths and finalPaths arrays are copied if
the writerOffset is not smaller than their length. So these array are extended.
But in the second part when the outPathsCommitted array is copied, the size of
the array is not compared with the writerOffset. So it can happen that the
outPathsCommitted array is reduced. If this situation happens it leads to the
ArrayIndexOutOfBoundsException when closing the writes, because the
outPathsCommitted array is shorter than the updaters array.
*About the reproduction:
The first insert into the full_acid table creates files with buckets 1, 2, 3,
5, 6
The second insert creates the files with buckets 1, 4, 6
The bucket number for 6 is 537264128, for 4 is 537133056, so for 4 it is
smaller than for 6.
To reproduce the issue, we need to delete a row from bucket 6 and bucket 4
together and make it so, that both rows are processed by the same
FileSinkOperator. It will process the row from bucket 6 first, so makes the
arraycopy in dynamicBucketing with the writerOffset 6. Then comes the row for 4
and in the dynamicBucketing it will do the second arrayCopy wrongly. So the
finalPath array will be size 7, but the outPathsCommitted will be arrayCopied
to size 4+1. This will cause the exception when closing the writers.
By setting the reducer number to 1 before the delete, both rows are processed
by the same FileSinkOperator.
was:
Steps to reproduce:
{code:java}
set mapreduce.job.reduces=7;
create external table ext(a int) stored as textfile;
insert into table ext values(1),(2),(3),(4),(5),(6),(7), (8), (9), (12);
create table full_acid(a int) stored as orc
tblproperties("transactional"="true");
insert into table full_acid select * from ext where a != 3 and a <=7 group
by a;
insert into table full_acid select * from ext where a>7 group by a;
set mapreduce.job.reduces=1;
delete from full_acid where a in (2, 12);
{code}
The delete will fail with the following exception:
{code}
Caused by: java.lang.ArrayIndexOutOfBoundsException: 6
at
org.apache.hadoop.hive.ql.exec.FileSinkOperator$FSPaths.closeWriters(FileSinkOperator.java:258)
{code}
The problem is in the FileSinkOperator.createDynamicBucket method:
{code}
public int createDynamicBucket(int bucketNum) {
// this assumes all paths are bucket names (which means no lookup is
needed)
int writerOffset = bucketNum;
if (updaters.length <= writerOffset) {
this.updaters = Arrays.copyOf(updaters, writerOffset + 1);
this.outPaths = Arrays.copyOf(outPaths, writerOffset + 1);
this.finalPaths = Arrays.copyOf(finalPaths, writerOffset + 1);
}
if (this.finalPaths[writerOffset] == null) {
if (conf.isDirectInsert()) {
this.outPathsCommitted = Arrays.copyOf(outPathsCommitted,
writerOffset + 1);
this.finalPaths[writerOffset] = buildTmpPath();
this.outPaths[writerOffset] = buildTmpPath();
} else {
// uninitialized bucket
String bucketName =
Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf),
bucketNum);
this.finalPaths[writerOffset] = new Path(bDynParts ? buildTmpPath() :
parent, bucketName);
this.outPaths[writerOffset] = new Path(buildTaskOutputTempPath(),
bucketName);
}
}
return writerOffset;
}
} // class FSPaths
{code}
In the first part the updaters, outPaths and finalPaths arrays are copied if
the writerOffset is not smaller than their length. So these array are extended.
But in the second part when the outPathsCommitted array is copied, the size of
the array is not compared with the writerOffset. So it can happen that the
outPathsCommitted array is reduced. If this situation happens it leads to the
> ACID deletes are failing with ArrayIndexOutOfBoundsException when direct
> insert is enabled
> -------------------------------------------------------------------------------------------
>
> Key: HIVE-28790
> URL: https://issues.apache.org/jira/browse/HIVE-28790
> Project: Hive
> Issue Type: Bug
> Affects Versions: 4.0.0
> Reporter: Marta Kuczora
> Priority: Major
>
> *Steps to reproduce:
> {code:java}
> set mapreduce.job.reduces=7;
> create external table ext(a int) stored as textfile;
> insert into table ext values(1),(2),(3),(4),(5),(6),(7), (8), (9), (12);
> create table full_acid(a int) stored as orc
> tblproperties("transactional"="true");
> insert into table full_acid select * from ext where a != 3 and a <=7
> group by a;
> insert into table full_acid select * from ext where a>7 group by a;
> set mapreduce.job.reduces=1;
> delete from full_acid where a in (2, 12);
> {code}
> The delete will fail with the following exception:
> {code}
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 6
> at
> org.apache.hadoop.hive.ql.exec.FileSinkOperator$FSPaths.closeWriters(FileSinkOperator.java:258)
> {code}
> The problem is in the FileSinkOperator.createDynamicBucket method:
> {code}
> public int createDynamicBucket(int bucketNum) {
> // this assumes all paths are bucket names (which means no lookup is
> needed)
> int writerOffset = bucketNum;
> if (updaters.length <= writerOffset) {
> this.updaters = Arrays.copyOf(updaters, writerOffset + 1);
> this.outPaths = Arrays.copyOf(outPaths, writerOffset + 1);
> this.finalPaths = Arrays.copyOf(finalPaths, writerOffset + 1);
> }
> if (this.finalPaths[writerOffset] == null) {
> if (conf.isDirectInsert()) {
> this.outPathsCommitted = Arrays.copyOf(outPathsCommitted,
> writerOffset + 1);
> this.finalPaths[writerOffset] = buildTmpPath();
> this.outPaths[writerOffset] = buildTmpPath();
> } else {
> // uninitialized bucket
> String bucketName =
> Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf),
> bucketNum);
> this.finalPaths[writerOffset] = new Path(bDynParts ? buildTmpPath()
> : parent, bucketName);
> this.outPaths[writerOffset] = new Path(buildTaskOutputTempPath(),
> bucketName);
> }
> }
> return writerOffset;
> }
> } // class FSPaths
> {code}
> In the first part the updaters, outPaths and finalPaths arrays are copied if
> the writerOffset is not smaller than their length. So these array are
> extended. But in the second part when the outPathsCommitted array is copied,
> the size of the array is not compared with the writerOffset. So it can happen
> that the outPathsCommitted array is reduced. If this situation happens it
> leads to the ArrayIndexOutOfBoundsException when closing the writes, because
> the outPathsCommitted array is shorter than the updaters array.
> *About the reproduction:
> The first insert into the full_acid table creates files with buckets 1, 2, 3,
> 5, 6
> The second insert creates the files with buckets 1, 4, 6
> The bucket number for 6 is 537264128, for 4 is 537133056, so for 4 it is
> smaller than for 6.
> To reproduce the issue, we need to delete a row from bucket 6 and bucket 4
> together and make it so, that both rows are processed by the same
> FileSinkOperator. It will process the row from bucket 6 first, so makes the
> arraycopy in dynamicBucketing with the writerOffset 6. Then comes the row for
> 4 and in the dynamicBucketing it will do the second arrayCopy wrongly. So the
> finalPath array will be size 7, but the outPathsCommitted will be arrayCopied
> to size 4+1. This will cause the exception when closing the writers.
> By setting the reducer number to 1 before the delete, both rows are processed
> by the same FileSinkOperator.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)