[
https://issues.apache.org/jira/browse/FLINK-20330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-20330:
-----------------------------------
Labels: auto-deprioritized-major stale-minor (was:
auto-deprioritized-major)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is
still Minor, please either assign yourself or give an update. Afterwards,
please remove the label or in 7 days the issue will be deprioritized.
> Flink connector has error in support hive external tables (hbase or es)
> -----------------------------------------------------------------------
>
> Key: FLINK-20330
> URL: https://issues.apache.org/jira/browse/FLINK-20330
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Hive
> Affects Versions: 1.11.0, 1.11.1, 1.11.2
> Environment: TEST CODE LIKE THIS:
> CREATE EXTERNAL TABLE hive_to_es (
> key string,
> value string
> )
> STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
> TBLPROPERTIES(
> 'es.resource' = 'hive_to_es/_doc',
> 'es.index.auto.create' = 'TRUE',
> 'es.nodes'='192.168.1.111:9200,192.168.1.112:9200,192.168.1.113:9200'
> );
> insert into hive_to_es (key, value) values ('name','tom');
> insert into hive_to_es (key, value) values ('yes','aaa');
> select * from hive_to_es;
>
> !image-2020-11-25-09-51-00-100.png|width=807,height=134!
> Reporter: liu
> Priority: Minor
> Labels: auto-deprioritized-major, stale-minor
> Attachments: image-2020-11-25-09-42-13-102.png,
> image-2020-11-25-09-51-00-100.png
>
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate
> the hadoop
>
> input format !image-2020-11-25-09-42-13-102.png|width=384,height=288!
>
>
> we add a patch like this:
>
> flink-connector-hive_2.12-1.11.2.jar
> org/apache/flink/connectors/hive/HiveTableSink.java +134
>
> ADD PATCH:
>
> {code:java}
> // code placeholder
> if (sd.getOutputFormat() == null &&
> "org.apache.hadoop.hive.hbase.HBaseSerDe".equals(sd.getSerdeInfo().getSerializationLib()))
> {
> sd.setOutputFormat("org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat");
> }
> if (sd.getOutputFormat() == null &&
> "org.elasticsearch.hadoop.hive.EsSerDe".equals(sd.getSerdeInfo().getSerializationLib()))
> { sd.setOutputFormat("org.elasticsearch.hadoop.hive.EsHiveOutputFormat");
> }
> {code}
> org/apache/flink/connectors/hive/read/HiveTableInputFormat.java + 305
> ADD PATCH:
> {code:java}
> // code placeholder
> if (sd.getInputFormat() == null &&
> "org.apache.hadoop.hive.hbase.HBaseSerDe".equals(sd.getSerdeInfo().getSerializationLib()))
> {
> sd.setInputFormat("org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat");
> jobConf.set("hbase.table.name",
> partition.getTableProps().getProperty("hbase.table.name"));
> jobConf.set("hbase.columns.mapping",
> partition.getTableProps().getProperty("hbase.columns.mapping"));
> }
> if (sd.getInputFormat() == null &&
> "org.elasticsearch.hadoop.hive.EsSerDe".equals(sd.getSerdeInfo().getSerializationLib()))
> { sd.setInputFormat("org.elasticsearch.hadoop.hive.EsHiveInputFormat");
> jobConf.set("location", sd.getLocation()); for (Enumeration en =
> partition.getTableProps().keys(); en.hasMoreElements();) { String key =
> en.nextElement().toString(); if(key.startsWith("es.")){ jobConf.set(key,
> partition.getTableProps().getProperty(key)); } }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)