Paul Wu created FLINK-8356:
------------------------------
Summary: JDBCAppendTableSink does not work for Hbase Phoenix
Driver
Key: FLINK-8356
URL: https://issues.apache.org/jira/browse/FLINK-8356
Project: Flink
Issue Type: Bug
Components: Table API & SQL
Affects Versions: 1.4.0
Reporter: Paul Wu
The following code runs without errors, but the data is not inserted into the
HBase table. However, it does work for MySQL (see the commented out code). The
Phoenix driver is from
https://mvnrepository.com/artifact/org.apache.phoenix/phoenix/4.7.0-HBase-1.1
String query = "select CURRENT_DATE SEGMENTSTARTTIME, CURRENT_DATE
SEGMENTENDTIME, cast (imsi as varchar) imsi, cast(imei as varchar) imei from ts
";
Table table = ste.sqlQuery(query);
JDBCAppendTableSinkBuilder jdbc = JDBCAppendTableSink.builder();
jdbc.setDrivername("org.apache.phoenix.jdbc.PhoenixDriver");
jdbc.setDBUrl("jdbc:phoenix:hosts:2181:/hbase-unsecure");
jdbc.setQuery("upsert INTO GEO_ANALYTICS_STREAMING_DATA
(SEGMENTSTARTTIME,SEGMENTENDTIME, imsi, imei) values (?,?,?, ?)");
// JDBCAppendTableSinkBuilder jdbc = JDBCAppendTableSink.builder();
// jdbc.setDrivername("com.mysql.jdbc.Driver");
// jdbc.setDBUrl("jdbc:mysql://localhost/test");
// jdbc.setUsername("root").setPassword("");
// jdbc.setQuery("insert INTO GEO_ANALYTICS_STREAMING_DATA
(SEGMENTSTARTTIME,SEGMENTENDTIME, imsi, imei) values (?,?,?, ?)");
// jdbc.setBatchSize(1);
jdbc.setParameterTypes(Types.SQL_DATE, Types.SQL_DATE, Types.STRING,
Types.STRING);
JDBCAppendTableSink sink = jdbc.build();
table.writeToSink(sink);
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)