liu created FLINK-20330:
---------------------------
Summary: Flink connector has error in support hive external tables
(hbase or es)
Key: FLINK-20330
URL: https://issues.apache.org/jira/browse/FLINK-20330
Project: Flink
Issue Type: Bug
Components: Connectors / Hive
Affects Versions: 1.11.2, 1.11.1, 1.11.0
Environment: TEST CODE LIKE THIS:
CREATE EXTERNAL TABLE hive_to_es (
key string,
value string
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.resource' = 'hive_to_es/_doc',
'es.index.auto.create' = 'TRUE',
'es.nodes'='192.168.1.111:9200,192.168.1.112:9200,192.168.1.113:9200'
);
insert into hive_to_es (key, value) values ('name','tom');
insert into hive_to_es (key, value) values ('yes','aaa');
select * from hive_to_es;
!image-2020-11-25-09-51-00-100.png|width=807,height=134!
Reporter: liu
Attachments: image-2020-11-25-09-42-13-102.png,
image-2020-11-25-09-51-00-100.png
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate the
hadoop
input format !image-2020-11-25-09-42-13-102.png|width=384,height=288!
we add a patch like this:
flink-connector-hive_2.12-1.11.2.jar
org/apache/flink/connectors/hive/HiveTableSink.java +134
ADD PATCH:
{code:java}
// code placeholder
if (sd.getOutputFormat() == null &&
"org.apache.hadoop.hive.hbase.HBaseSerDe".equals(sd.getSerdeInfo().getSerializationLib()))
{
sd.setOutputFormat("org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat");
}
if (sd.getOutputFormat() == null &&
"org.elasticsearch.hadoop.hive.EsSerDe".equals(sd.getSerdeInfo().getSerializationLib()))
{ sd.setOutputFormat("org.elasticsearch.hadoop.hive.EsHiveOutputFormat");
}
{code}
org/apache/flink/connectors/hive/read/HiveTableInputFormat.java + 305
ADD PATCH:
{code:java}
// code placeholder
if (sd.getInputFormat() == null &&
"org.apache.hadoop.hive.hbase.HBaseSerDe".equals(sd.getSerdeInfo().getSerializationLib()))
{ sd.setInputFormat("org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat");
jobConf.set("hbase.table.name",
partition.getTableProps().getProperty("hbase.table.name"));
jobConf.set("hbase.columns.mapping",
partition.getTableProps().getProperty("hbase.columns.mapping"));
}
if (sd.getInputFormat() == null &&
"org.elasticsearch.hadoop.hive.EsSerDe".equals(sd.getSerdeInfo().getSerializationLib()))
{ sd.setInputFormat("org.elasticsearch.hadoop.hive.EsHiveInputFormat");
jobConf.set("location", sd.getLocation()); for (Enumeration en =
partition.getTableProps().keys(); en.hasMoreElements();) { String key =
en.nextElement().toString(); if(key.startsWith("es.")){ jobConf.set(key,
partition.getTableProps().getProperty(key)); } }
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)