[
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17464946#comment-17464946
]
Bruce Wong commented on FLINK-25330:
------------------------------------
Hi, [~jingge]
Thanks for your reply. I updated the test data (bundle_data1.zip), and the test
result is test_res2.png. The test results are the same as before.
The test code and configuration are as follows.
{code:java}
package com.bruce;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static
org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;
public class KafkaToHBase {
public static void main(String[] args) {
Configuration cfg = new Configuration();
cfg.setBoolean(LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
env.setParallelism(1);
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
envSettings);
TableConfig config = tEnv.getConfig();
// config.setIdleStateRetention(Duration.ofHours(2));
String source = "CREATE TEMPORARY TABLE IF NOT EXISTS
kafka_llspay_bundles(\n" +
" id STRING,\n" +
" category_id STRING,\n" +
" upc STRING,\n" +
" `name` STRING,\n" +
" price_cents STRING,\n" +
" original_price_cents STRING,\n" +
" short_desc STRING,\n" +
" desc STRING,\n" +
" cover_url STRING,\n" +
" created_at STRING,\n" +
" updated_at STRING,\n" +
" deleted_at STRING,\n" +
" extra STRING,\n" +
" status STRING,\n" +
" scholarship_cents STRING,\n" +
" is_payback STRING,\n" +
" is_support_iap STRING,\n" +
" iap_product_id STRING,\n" +
" neo_product_code STRING,\n" +
" paid_redirect_url STRING,\n" +
" subscription_type STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'dim-bundles',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'properties.group.id' = 'vvp_dev',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'value.debezium-json.schema-include' = 'true',\n" +
" 'value.format' = 'debezium-json',\n" +
" 'value.debezium-json.ignore-parse-errors' = 'true'\n" +
")";
String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
" rowkey STRING,\n" +
" cf ROW<id STRING, category_id STRING, upc STRING, `name`
STRING, price_cents STRING, original_price_cents STRING, short_desc STRING,
`desc` STRING, cover_url STRING, created_at STRING, updated_at STRING,
deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING,
is_payback STRING, is_support_iap STRING, iap_product_id STRING,
neo_product_code STRING, paid_redirect_url STRING, subscription_type STRING>\n"
+
") with (\n" +
" 'connector'='hbase-2.2',\n" +
" 'table-name'='dim_hbase',\n" +
" 'zookeeper.quorum'='localhost:2181',\n" +
" 'sink.buffer-flush.max-size' = '0',\n" +
" 'sink.buffer-flush.max-rows' = '1'\n" +
")";
String dml = "INSERT INTO dim_hbase\n" +
"SELECT \n" +
" upc as rowkey,\n" +
" ROW(\n" +
" id, category_id, upc, `name`, price_cents,
original_price_cents, short_desc, `desc` , cover_url , created_at, updated_at,
deleted_at, extra , status , scholarship_cents , is_payback , is_support_iap ,
iap_product_id , neo_product_code , paid_redirect_url , subscription_type)\n" +
"FROM kafka_llspay_bundles";
tEnv.executeSql(source);
tEnv.executeSql(sink);
tEnv.executeSql(dml);
}
} {code}
> Flink SQL doesn't retract all versions of Hbase data
> ----------------------------------------------------
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
> Issue Type: Bug
> Components: Connectors / HBase
> Reporter: Bruce Wong
> Assignee: Jing Ge
> Priority: Major
> Labels: pull-request-available
> Attachments: Flink-SQL-Test.zip, bundle_data.zip, bundle_data1.zip,
> image-2021-12-15-20-05-18-236.png, test_res.png, test_res2.png, test_res_1.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase
> deletes only the last version of the specified rowkey when deleting mysql
> data. The data of the old version still exists. You end up using the wrong
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>
> h2.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)