MOBIN created FLINK-36188:
-----------------------------
Summary: HBase disable buffer flush lose efficacy
Key: FLINK-36188
URL: https://issues.apache.org/jira/browse/FLINK-36188
Project: Flink
Issue Type: Bug
Components: Connectors / HBase
Affects Versions: hbase-3.0.0
Environment: Flink 1.16.3
Reporter: MOBIN
HBase table
||rowkey||col||
|1|1|
The user lookup joins the hbase table, adds 1 to the col value, and writes it
back to hbase
{code:java}
@Test
void testTableSinkDisabledBufferFlush() throws Exception {
StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv,
streamSettings); tEnv.executeSql(
"CREATE TABLE hTableForSink ("
+ " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ " family1 ROW<col1 INT>"
+ ") WITH ("
+ " 'connector' = 'hbase-2.2',"
+ " 'sink.buffer-flush.max-size' = '0',"
+ " 'sink.buffer-flush.max-rows' = '0',"
+ " 'table-name' = '"
+ TEST_TABLE_6
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")"); String insert = "INSERT INTO
hTableForSink VALUES(1, ROW(1))";
tEnv.executeSql(insert).await(); tEnv.executeSql(
"CREATE VIEW user_click AS "
+ " SELECT user_id, proctime() AS proc_time"
+ " FROM ( "
+ " VALUES(1), (1), (1), (1), (1)"
+ " ) AS t (user_id);"); tEnv.executeSql(
"INSERT INTO hTableForSink SELECT "
+ " user_id as rowkey,"
+ " ROW(CAST(family1.col1 + 1 AS INT))"
+ " FROM user_click INNER JOIN hTableForSink"
+ " FOR SYSTEM_TIME AS OF user_click.proc_time"
+ " ON hTableForSink.rowkey = user_click.user_id;");
tEnv.executeSql(
"CREATE TABLE hTableForQuery ("
+ " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ " family1 ROW<col1 INT>"
+ ") WITH ("
+ " 'connector' = 'hbase-2.2',"
+ " 'table-name' = '"
+ TEST_TABLE_6
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");
String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";
TableResult firstResult = tEnv.executeSql(query);
List<Row> firstResults =
CollectionUtil.iteratorToList(firstResult.collect());
String firstExpected = "+I[1, 6]";
TestBaseUtils.compareResultAsText(firstResults, firstExpected);
} {code}
test failed
{code:java}
org.junit.ComparisonFailure: Different elements in arrays: expected 1 elements
and received 1
expected: [+I[1, 6]]
received: [+I[1, 2]] expected:<+I[1, [6]]> but was:<+I[1, [2]]>
Expected :+I[1, 6]
Actual :+I[1, 2] {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)