[
https://issues.apache.org/jira/browse/FLINK-29013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
luoyuxia updated FLINK-29013:
-----------------------------
Description:
It'll cause NPE when using following code:
{code:java}
tableEnv.executeSql(
"INSERT OVERWRITE TABLE dest1\n"
+ "SELECT TRANSFORM(*)\n"
+ " USING 'cat'\n"
+ " AS mydata STRING\n"
+ " ROW FORMAT SERDE\n"
+ "
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'\n"
+ " WITH SERDEPROPERTIES (\n"
+ "
'serialization.last.column.takes.rest'='true'\n"
+ " )\n"
+ " RECORDREADER
'org.apache.hadoop.hive.ql.exec.BinaryRecordReader'\n"
+ "FROM src"){code}
The NPE is thrown in
{code:java}
// HiveScriptTransformOutReadThread
recordReader.next(reusedWritableObject); {code}
For BinaryRecordReader, we should first call method
BinaryRecordReader#createRow to do initialization
{code:java}
// BinaryRecordReader
public Writable createRow() throws IOException {
bytes = new BytesWritable();
bytes.setCapacity(maxRecordLength);
return bytes;
} {code}
otherwise it will throw exception in the following code:
{code:java}
// BinaryRecordReader
public int next(Writable row) throws IOException {
int recordLength = in.read(bytes.get(), 0, maxRecordLength);
if (recordLength >= 0) {
bytes.setSize(recordLength);
}
return recordLength;
} {code}
was:
It'll cause NPE when using following code:
{code:java}
tableEnv.executeSql(
"INSERT OVERWRITE TABLE dest1\n"
+ "SELECT TRANSFORM(*)\n"
+ " USING 'cat'\n"
+ " AS mydata STRING\n"
+ " ROW FORMAT SERDE\n"
+ "
'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'\n"
+ " WITH SERDEPROPERTIES (\n"
+ "
'serialization.last.column.takes.rest'='true'\n"
+ " )\n"
+ " RECORDREADER
'org.apache.hadoop.hive.ql.exec.BinaryRecordReader'\n"
+ "FROM src"){code}
The NPE is thrown in
{code:java}
// HiveScriptTransformOutReadThread
recordReader.next(reusedWritableObject); {code}
For BinaryRecordReader, we should first call method
BinaryRecordReader#createRow to do initailization
{code:java}
// BinaryRecordReader
public Writable createRow() throws IOException {
bytes = new BytesWritable();
bytes.setCapacity(maxRecordLength);
return bytes;
} {code}
otherwise it will throw exception in the following code:
{code:java}
// BinaryRecordReader
public int next(Writable row) throws IOException {
int recordLength = in.read(bytes.get(), 0, maxRecordLength);
if (recordLength >= 0) {
bytes.setSize(recordLength);
}
return recordLength;
} {code}
> Fail to use "transform using" when record reader is binary record reader in
> Hive dialect
> -----------------------------------------------------------------------------------------
>
> Key: FLINK-29013
> URL: https://issues.apache.org/jira/browse/FLINK-29013
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Hive
> Affects Versions: 1.16.0
> Reporter: luoyuxia
> Priority: Major
> Fix For: 1.16.0
>
>
> It'll cause NPE when using following code:
>
> {code:java}
> tableEnv.executeSql(
> "INSERT OVERWRITE TABLE dest1\n"
> + "SELECT TRANSFORM(*)\n"
> + " USING 'cat'\n"
> + " AS mydata STRING\n"
> + " ROW FORMAT SERDE\n"
> + "
> 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'\n"
> + " WITH SERDEPROPERTIES (\n"
> + "
> 'serialization.last.column.takes.rest'='true'\n"
> + " )\n"
> + " RECORDREADER
> 'org.apache.hadoop.hive.ql.exec.BinaryRecordReader'\n"
> + "FROM src"){code}
>
> The NPE is thrown in
>
> {code:java}
> // HiveScriptTransformOutReadThread
> recordReader.next(reusedWritableObject); {code}
>
> For BinaryRecordReader, we should first call method
> BinaryRecordReader#createRow to do initialization
> {code:java}
> // BinaryRecordReader
> public Writable createRow() throws IOException {
> bytes = new BytesWritable();
> bytes.setCapacity(maxRecordLength);
> return bytes;
> } {code}
> otherwise it will throw exception in the following code:
> {code:java}
> // BinaryRecordReader
> public int next(Writable row) throws IOException {
> int recordLength = in.read(bytes.get(), 0, maxRecordLength);
> if (recordLength >= 0) {
> bytes.setSize(recordLength);
> }
> return recordLength;
> } {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)