[
https://issues.apache.org/jira/browse/FLINK-36188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ferenc Csaky updated FLINK-36188:
---------------------------------
Fix Version/s: hbase-4.0.0
> HBase disable buffer flush lose efficacy
> ----------------------------------------
>
> Key: FLINK-36188
> URL: https://issues.apache.org/jira/browse/FLINK-36188
> Project: Flink
> Issue Type: Bug
> Components: Connectors / HBase
> Affects Versions: hbase-3.0.0
> Environment: Flink 1.16.3
> Reporter: MOBIN
> Assignee: MOBIN
> Priority: Critical
> Labels: pull-request-available
> Fix For: hbase-4.0.0
>
>
> HBase table
> ||rowkey||col||
> |1|1|
> The user lookup joins the hbase table, adds 1 to the col value, and writes it
> back to hbase
> {code:java}
> @Test
> void testTableSinkDisabledBufferFlush() throws Exception {
> StreamExecutionEnvironment execEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv,
> streamSettings);
> tEnv.executeSql(
> "CREATE TABLE hTableForSink ("
> + " rowkey INT PRIMARY KEY NOT ENFORCED,"
> + " family1 ROW<col1 INT>"
> + ") WITH ("
> + " 'connector' = 'hbase-2.2',"
> + " 'sink.buffer-flush.max-size' = '0',"
> + " 'sink.buffer-flush.max-rows' = '0',"
> + " 'table-name' = '"
> + TEST_TABLE_6
> + "',"
> + " 'zookeeper.quorum' = '"
> + getZookeeperQuorum()
> + "'"
> + ")");
> String insert = "INSERT INTO hTableForSink VALUES(1, ROW(1))";
> tEnv.executeSql(insert).await();
> tEnv.executeSql(
> "CREATE VIEW user_click AS "
> + " SELECT user_id, proctime() AS proc_time"
> + " FROM ( "
> + " VALUES(1), (1), (1), (1), (1)"
> + " ) AS t (user_id);");
>
> tEnv.executeSql(
> "INSERT INTO hTableForSink SELECT "
> + " user_id as rowkey,"
> + " ROW(CAST(family1.col1 + 1 AS INT))"
> + " FROM user_click INNER JOIN hTableForSink"
> + " FOR SYSTEM_TIME AS OF user_click.proc_time"
> + " ON hTableForSink.rowkey = user_click.user_id;");
>
> tEnv.executeSql(
> "CREATE TABLE hTableForQuery ("
> + " rowkey INT PRIMARY KEY NOT ENFORCED,"
> + " family1 ROW<col1 INT>"
> + ") WITH ("
> + " 'connector' = 'hbase-2.2',"
> + " 'table-name' = '"
> + TEST_TABLE_6
> + "',"
> + " 'zookeeper.quorum' = '"
> + getZookeeperQuorum()
> + "'"
> + ")");
> String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";
>
> TableResult firstResult = tEnv.executeSql(query);
> List<Row> firstResults =
> CollectionUtil.iteratorToList(firstResult.collect());
> String firstExpected = "+I[1, 6]";
> TestBaseUtils.compareResultAsText(firstResults, firstExpected);
> } {code}
> test failed
> {code:java}
> org.junit.ComparisonFailure: Different elements in arrays: expected 1
> elements and received 1
> expected: [+I[1, 6]]
> received: [+I[1, 2]] expected:<+I[1, [6]]> but was:<+I[1, [2]]>
> Expected :+I[1, 6]
> Actual :+I[1, 2] {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)