[ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17464946#comment-17464946
 ] 

Bruce Wong commented on FLINK-25330:
------------------------------------

Hi, [~jingge] 

Thanks for your reply. I updated the test data (bundle_data1.zip), and the test 
result is test_res2.png. The test results are the same as before.
The test code and configuration are as follows.

 
{code:java}
package com.bruce;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static 
org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;

public class KafkaToHBase {
    public static void main(String[] args) {
        Configuration cfg = new Configuration();
        cfg.setBoolean(LOCAL_START_WEBSERVER, true);
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
        env.setParallelism(1);
        EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
envSettings);
        TableConfig config = tEnv.getConfig();
//        config.setIdleStateRetention(Duration.ofHours(2));

        String source = "CREATE TEMPORARY TABLE IF NOT EXISTS 
kafka_llspay_bundles(\n" +
                "  id                   STRING,\n" +
                "  category_id           STRING,\n" +
                "  upc                   STRING,\n" +
                "  `name`                STRING,\n" +
                "  price_cents           STRING,\n" +
                "  original_price_cents  STRING,\n" +
                "  short_desc            STRING,\n" +
                "  desc                  STRING,\n" +
                "  cover_url             STRING,\n" +
                "  created_at            STRING,\n" +
                "  updated_at            STRING,\n" +
                "  deleted_at            STRING,\n" +
                "  extra                 STRING,\n" +
                "  status                STRING,\n" +
                "  scholarship_cents     STRING,\n" +
                "  is_payback            STRING,\n" +
                "  is_support_iap        STRING,\n" +
                "  iap_product_id        STRING,\n" +
                "  neo_product_code      STRING,\n" +
                "  paid_redirect_url     STRING,\n" +
                "  subscription_type     STRING\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'dim-bundles',\n" +
                "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
                "  'properties.group.id' = 'vvp_dev',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'value.debezium-json.schema-include' = 'true',\n" +
                "  'value.format' = 'debezium-json',\n" +
                "  'value.debezium-json.ignore-parse-errors' = 'true'\n" +
                ")";
        String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
                "    rowkey      STRING,\n" +
                "    cf ROW<id STRING, category_id STRING, upc STRING, `name` 
STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, 
`desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, 
deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, 
is_payback STRING, is_support_iap STRING, iap_product_id STRING, 
neo_product_code STRING, paid_redirect_url STRING, subscription_type STRING>\n" 
+
                ") with (\n" +
                "  'connector'='hbase-2.2',\n" +
                "  'table-name'='dim_hbase',\n" +
                "  'zookeeper.quorum'='localhost:2181',\n" +
                "  'sink.buffer-flush.max-size' = '0',\n" +
                "  'sink.buffer-flush.max-rows' = '1'\n" +
                ")";
        String dml = "INSERT INTO dim_hbase\n" +
                "SELECT \n" +
                "  upc as rowkey,\n" +
                "  ROW(\n" +
                "    id, category_id, upc, `name`, price_cents, 
original_price_cents, short_desc, `desc` , cover_url , created_at, updated_at, 
deleted_at, extra , status , scholarship_cents , is_payback , is_support_iap , 
iap_product_id , neo_product_code , paid_redirect_url , subscription_type)\n" +
                "FROM kafka_llspay_bundles";
        tEnv.executeSql(source);
        tEnv.executeSql(sink);
        tEnv.executeSql(dml);
    }
} {code}

> Flink SQL doesn't retract all versions of Hbase data
> ----------------------------------------------------
>
>                 Key: FLINK-25330
>                 URL: https://issues.apache.org/jira/browse/FLINK-25330
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / HBase
>            Reporter: Bruce Wong
>            Assignee: Jing Ge
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: Flink-SQL-Test.zip, bundle_data.zip, bundle_data1.zip, 
> image-2021-12-15-20-05-18-236.png, test_res.png, test_res2.png, test_res_1.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to