[
https://issues.apache.org/jira/browse/FLINK-36188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
MOBIN updated FLINK-36188:
--------------------------
Description:
HBase table
||rowkey||col||
|1|1|
The user lookup joins the hbase table, adds 1 to the col value, and writes it
back to hbase
{code:java}
@Test
void testTableSinkDisabledBufferFlush() throws Exception {
StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv,
streamSettings);
tEnv.executeSql(
"CREATE TABLE hTableForSink ("
+ " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ " family1 ROW<col1 INT>"
+ ") WITH ("
+ " 'connector' = 'hbase-2.2',"
+ " 'sink.buffer-flush.max-size' = '0',"
+ " 'sink.buffer-flush.max-rows' = '0',"
+ " 'table-name' = '"
+ TEST_TABLE_6
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");
String insert = "INSERT INTO hTableForSink VALUES(1, ROW(1))";
tEnv.executeSql(insert).await();
tEnv.executeSql(
"CREATE VIEW user_click AS "
+ " SELECT user_id, proctime() AS proc_time"
+ " FROM ( "
+ " VALUES(1), (1), (1), (1), (1)"
+ " ) AS t (user_id);");
tEnv.executeSql(
"INSERT INTO hTableForSink SELECT "
+ " user_id as rowkey,"
+ " ROW(CAST(family1.col1 + 1 AS INT))"
+ " FROM user_click INNER JOIN hTableForSink"
+ " FOR SYSTEM_TIME AS OF user_click.proc_time"
+ " ON hTableForSink.rowkey = user_click.user_id;");
tEnv.executeSql(
"CREATE TABLE hTableForQuery ("
+ " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ " family1 ROW<col1 INT>"
+ ") WITH ("
+ " 'connector' = 'hbase-2.2',"
+ " 'table-name' = '"
+ TEST_TABLE_6
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");
String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";
TableResult firstResult = tEnv.executeSql(query);
List<Row> firstResults =
CollectionUtil.iteratorToList(firstResult.collect());
String firstExpected = "+I[1, 6]";
TestBaseUtils.compareResultAsText(firstResults, firstExpected);
} {code}
test failed
{code:java}
org.junit.ComparisonFailure: Different elements in arrays: expected 1 elements
and received 1
expected: [+I[1, 6]]
received: [+I[1, 2]] expected:<+I[1, [6]]> but was:<+I[1, [2]]>
Expected :+I[1, 6]
Actual :+I[1, 2] {code}
was:
HBase table
||rowkey||col||
|1|1|
The user lookup joins the hbase table, adds 1 to the col value, and writes it
back to hbase
{code:java}
@Test
void testTableSinkDisabledBufferFlush() throws Exception {
StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv,
streamSettings);
tEnv.executeSql(
"CREATE TABLE hTableForSink ("
+ " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ " family1 ROW<col1 INT>"
+ ") WITH ("
+ " 'connector' = 'hbase-2.2',"
+ " 'sink.buffer-flush.max-size' = '0',"
+ " 'sink.buffer-flush.max-rows' = '0',"
+ " 'table-name' = '"
+ TEST_TABLE_6
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")"); String insert = "INSERT INTO
hTableForSink VALUES(1, ROW(1))";
tEnv.executeSql(insert).await(); tEnv.executeSql(
"CREATE VIEW user_click AS "
+ " SELECT user_id, proctime() AS proc_time"
+ " FROM ( "
+ " VALUES(1), (1), (1), (1), (1)"
+ " ) AS t (user_id);"); tEnv.executeSql(
"INSERT INTO hTableForSink SELECT "
+ " user_id as rowkey,"
+ " ROW(CAST(family1.col1 + 1 AS INT))"
+ " FROM user_click INNER JOIN hTableForSink"
+ " FOR SYSTEM_TIME AS OF user_click.proc_time"
+ " ON hTableForSink.rowkey = user_click.user_id;");
tEnv.executeSql(
"CREATE TABLE hTableForQuery ("
+ " rowkey INT PRIMARY KEY NOT ENFORCED,"
+ " family1 ROW<col1 INT>"
+ ") WITH ("
+ " 'connector' = 'hbase-2.2',"
+ " 'table-name' = '"
+ TEST_TABLE_6
+ "',"
+ " 'zookeeper.quorum' = '"
+ getZookeeperQuorum()
+ "'"
+ ")");
String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";
TableResult firstResult = tEnv.executeSql(query);
List<Row> firstResults =
CollectionUtil.iteratorToList(firstResult.collect());
String firstExpected = "+I[1, 6]";
TestBaseUtils.compareResultAsText(firstResults, firstExpected);
} {code}
test failed
{code:java}
org.junit.ComparisonFailure: Different elements in arrays: expected 1 elements
and received 1
expected: [+I[1, 6]]
received: [+I[1, 2]] expected:<+I[1, [6]]> but was:<+I[1, [2]]>
Expected :+I[1, 6]
Actual :+I[1, 2] {code}
> HBase disable buffer flush lose efficacy
> ----------------------------------------
>
> Key: FLINK-36188
> URL: https://issues.apache.org/jira/browse/FLINK-36188
> Project: Flink
> Issue Type: Bug
> Components: Connectors / HBase
> Affects Versions: hbase-3.0.0
> Environment: Flink 1.16.3
> Reporter: MOBIN
> Priority: Critical
>
> HBase table
> ||rowkey||col||
> |1|1|
> The user lookup joins the hbase table, adds 1 to the col value, and writes it
> back to hbase
> {code:java}
> @Test
> void testTableSinkDisabledBufferFlush() throws Exception {
> StreamExecutionEnvironment execEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv,
> streamSettings);
> tEnv.executeSql(
> "CREATE TABLE hTableForSink ("
> + " rowkey INT PRIMARY KEY NOT ENFORCED,"
> + " family1 ROW<col1 INT>"
> + ") WITH ("
> + " 'connector' = 'hbase-2.2',"
> + " 'sink.buffer-flush.max-size' = '0',"
> + " 'sink.buffer-flush.max-rows' = '0',"
> + " 'table-name' = '"
> + TEST_TABLE_6
> + "',"
> + " 'zookeeper.quorum' = '"
> + getZookeeperQuorum()
> + "'"
> + ")");
> String insert = "INSERT INTO hTableForSink VALUES(1, ROW(1))";
> tEnv.executeSql(insert).await();
> tEnv.executeSql(
> "CREATE VIEW user_click AS "
> + " SELECT user_id, proctime() AS proc_time"
> + " FROM ( "
> + " VALUES(1), (1), (1), (1), (1)"
> + " ) AS t (user_id);");
>
> tEnv.executeSql(
> "INSERT INTO hTableForSink SELECT "
> + " user_id as rowkey,"
> + " ROW(CAST(family1.col1 + 1 AS INT))"
> + " FROM user_click INNER JOIN hTableForSink"
> + " FOR SYSTEM_TIME AS OF user_click.proc_time"
> + " ON hTableForSink.rowkey = user_click.user_id;");
>
> tEnv.executeSql(
> "CREATE TABLE hTableForQuery ("
> + " rowkey INT PRIMARY KEY NOT ENFORCED,"
> + " family1 ROW<col1 INT>"
> + ") WITH ("
> + " 'connector' = 'hbase-2.2',"
> + " 'table-name' = '"
> + TEST_TABLE_6
> + "',"
> + " 'zookeeper.quorum' = '"
> + getZookeeperQuorum()
> + "'"
> + ")");
> String query = "SELECT rowkey, family1.col1 FROM hTableForQuery";
>
> TableResult firstResult = tEnv.executeSql(query);
> List<Row> firstResults =
> CollectionUtil.iteratorToList(firstResult.collect());
> String firstExpected = "+I[1, 6]";
> TestBaseUtils.compareResultAsText(firstResults, firstExpected);
> } {code}
> test failed
> {code:java}
> org.junit.ComparisonFailure: Different elements in arrays: expected 1
> elements and received 1
> expected: [+I[1, 6]]
> received: [+I[1, 2]] expected:<+I[1, [6]]> but was:<+I[1, [2]]>
> Expected :+I[1, 6]
> Actual :+I[1, 2] {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)