[ 
https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruce Wong updated FLINK-25560:
-------------------------------
    Description: 
h1. Motivation

When we synchronize data from mysql to HBase, we find that when deleting data 
from mysql, HBase cannot delete all versions, which leads to incorrect 
semantics. So we want to add a parameter to control deleting the latest version 
or deleting all versions.
h1. Usage

The test code is as follows.
{code:java}
package com.bruce;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static 
org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;

public class KafkaToHBase {
    public static void main(String[] args) {
        Configuration cfg = new Configuration();
        cfg.setBoolean(LOCAL_START_WEBSERVER, true);
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
        env.setParallelism(1);
        EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
envSettings);
//        TableConfig config = tEnv.getConfig();
//        config.setIdleStateRetention(Duration.ofHours(2));

        String source = "CREATE TEMPORARY TABLE IF NOT EXISTS 
kafka_llspay_bundles(\n" +
                "  id                   STRING,\n" +
                "  category_id           STRING,\n" +
                "  upc                   STRING,\n" +
                "  `name`                STRING,\n" +
                "  price_cents           STRING,\n" +
                "  original_price_cents  STRING,\n" +
                "  short_desc            STRING,\n" +
                "  desc                  STRING,\n" +
                "  cover_url             STRING,\n" +
                "  created_at            STRING,\n" +
                "  updated_at            STRING,\n" +
                "  deleted_at            STRING,\n" +
                "  extra                 STRING,\n" +
                "  status                STRING,\n" +
                "  scholarship_cents     STRING,\n" +
                "  is_payback            STRING,\n" +
                "  is_support_iap        STRING,\n" +
                "  iap_product_id        STRING,\n" +
                "  neo_product_code      STRING,\n" +
                "  paid_redirect_url     STRING,\n" +
                "  subscription_type     STRING\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'dim-bundles',\n" +
                "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
                "  'properties.group.id' = 'vvp_dev',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'value.debezium-json.schema-include' = 'true',\n" +
                "  'value.format' = 'debezium-json',\n" +
                "  'value.debezium-json.ignore-parse-errors' = 'true'\n" +
                ")";
        String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
                "    rowkey      STRING,\n" +
                "    cf ROW<id STRING, category_id STRING, upc STRING, `name` 
STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, 
`desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, 
deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, 
is_payback STRING, is_support_iap STRING, iap_product_id STRING, 
neo_product_code STRING, paid_redirect_url STRING, subscription_type STRING>\n" 
+
                ") with (\n" +
                "  'connector'='hbase-2.2',\n" +
                "  'table-name'='dim_hbase',\n" +
                "  'zookeeper.quorum'='localhost:2181',\n" +
                "  'sink.buffer-flush.max-size' = '0',\n" +
                "  'sink.buffer-flush.max-rows' = '1',\n" +
                "  'sink.delete.mode' = 'all-versions'\n" +
                ")";
        String dml = "INSERT INTO dim_hbase\n" +
                "SELECT \n" +
                "  upc as rowkey,\n" +
                "  ROW(\n" +
                "    id, category_id, upc, `name`, price_cents, 
original_price_cents, short_desc, `desc` , cover_url , created_at, updated_at, 
deleted_at, extra , status , scholarship_cents , is_payback , is_support_iap , 
iap_product_id , neo_product_code , paid_redirect_url , subscription_type)\n" +
                "FROM kafka_llspay_bundles";
        tEnv.executeSql(source);
        tEnv.executeSql(sink);
        tEnv.executeSql(dml);
    }
} {code}
h1. Test
 # After the test, we found that the deleted CF was the CF specified in the 
Flink SQL DML statement, without affecting other CF.

!image-2022-01-11-20-02-17-780.png|width=793,height=294!

 
h1. Reference

Please look at the following task link.

FLINK-25330
 

  was:
h1. Motivation

When we synchronize data from mysql to HBase, we find that when deleting data 
from mysql, HBase cannot delete all versions, which leads to incorrect 
semantics. So we want to add a parameter to control deleting the latest version 
or deleting all versions.
h1. Usage

The test code is as follows.
{code:java}
package com.bruce;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static 
org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;

public class KafkaToHBase {
    public static void main(String[] args) {
        Configuration cfg = new Configuration();
        cfg.setBoolean(LOCAL_START_WEBSERVER, true);
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
        env.setParallelism(1);
        EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
envSettings);
//        TableConfig config = tEnv.getConfig();
//        config.setIdleStateRetention(Duration.ofHours(2));

        String source = "CREATE TEMPORARY TABLE IF NOT EXISTS 
kafka_llspay_bundles(\n" +
                "  id                   STRING,\n" +
                "  category_id           STRING,\n" +
                "  upc                   STRING,\n" +
                "  `name`                STRING,\n" +
                "  price_cents           STRING,\n" +
                "  original_price_cents  STRING,\n" +
                "  short_desc            STRING,\n" +
                "  desc                  STRING,\n" +
                "  cover_url             STRING,\n" +
                "  created_at            STRING,\n" +
                "  updated_at            STRING,\n" +
                "  deleted_at            STRING,\n" +
                "  extra                 STRING,\n" +
                "  status                STRING,\n" +
                "  scholarship_cents     STRING,\n" +
                "  is_payback            STRING,\n" +
                "  is_support_iap        STRING,\n" +
                "  iap_product_id        STRING,\n" +
                "  neo_product_code      STRING,\n" +
                "  paid_redirect_url     STRING,\n" +
                "  subscription_type     STRING\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'dim-bundles',\n" +
                "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
                "  'properties.group.id' = 'vvp_dev',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'value.debezium-json.schema-include' = 'true',\n" +
                "  'value.format' = 'debezium-json',\n" +
                "  'value.debezium-json.ignore-parse-errors' = 'true'\n" +
                ")";
        String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
                "    rowkey      STRING,\n" +
                "    cf ROW<id STRING, category_id STRING, upc STRING, `name` 
STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, 
`desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, 
deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, 
is_payback STRING, is_support_iap STRING, iap_product_id STRING, 
neo_product_code STRING, paid_redirect_url STRING, subscription_type STRING>\n" 
+
                ") with (\n" +
                "  'connector'='hbase-2.2',\n" +
                "  'table-name'='dim_hbase',\n" +
                "  'zookeeper.quorum'='localhost:2181',\n" +
                "  'sink.buffer-flush.max-size' = '0',\n" +
                "  'sink.buffer-flush.max-rows' = '1',\n" +
                "  'sink.delete.mode' = 'all-versions'\n" +
                ")";
        String dml = "INSERT INTO dim_hbase\n" +
                "SELECT \n" +
                "  upc as rowkey,\n" +
                "  ROW(\n" +
                "    id, category_id, upc, `name`, price_cents, 
original_price_cents, short_desc, `desc` , cover_url , created_at, updated_at, 
deleted_at, extra , status , scholarship_cents , is_payback , is_support_iap , 
iap_product_id , neo_product_code , paid_redirect_url , subscription_type)\n" +
                "FROM kafka_llspay_bundles";
        tEnv.executeSql(source);
        tEnv.executeSql(sink);
        tEnv.executeSql(dml);
    }
} {code}
h1. Test

 
h1. Reference

Please look at the following task link.

FLINK-25330
 


> Add "sink.delete.mode" in HBase sql connector for retracting the latest 
> version or all versions in changelog mode
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-25560
>                 URL: https://issues.apache.org/jira/browse/FLINK-25560
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / HBase
>            Reporter: Bruce Wong
>            Assignee: Jing Ge
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2022-01-11-20-02-17-780.png
>
>
> h1. Motivation
> When we synchronize data from mysql to HBase, we find that when deleting data 
> from mysql, HBase cannot delete all versions, which leads to incorrect 
> semantics. So we want to add a parameter to control deleting the latest 
> version or deleting all versions.
> h1. Usage
> The test code is as follows.
> {code:java}
> package com.bruce;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.TableConfig;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static 
> org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;
> public class KafkaToHBase {
>     public static void main(String[] args) {
>         Configuration cfg = new Configuration();
>         cfg.setBoolean(LOCAL_START_WEBSERVER, true);
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
>         env.setParallelism(1);
>         EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
>                 .inStreamingMode()
>                 .build();
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> envSettings);
> //        TableConfig config = tEnv.getConfig();
> //        config.setIdleStateRetention(Duration.ofHours(2));
>         String source = "CREATE TEMPORARY TABLE IF NOT EXISTS 
> kafka_llspay_bundles(\n" +
>                 "  id                   STRING,\n" +
>                 "  category_id           STRING,\n" +
>                 "  upc                   STRING,\n" +
>                 "  `name`                STRING,\n" +
>                 "  price_cents           STRING,\n" +
>                 "  original_price_cents  STRING,\n" +
>                 "  short_desc            STRING,\n" +
>                 "  desc                  STRING,\n" +
>                 "  cover_url             STRING,\n" +
>                 "  created_at            STRING,\n" +
>                 "  updated_at            STRING,\n" +
>                 "  deleted_at            STRING,\n" +
>                 "  extra                 STRING,\n" +
>                 "  status                STRING,\n" +
>                 "  scholarship_cents     STRING,\n" +
>                 "  is_payback            STRING,\n" +
>                 "  is_support_iap        STRING,\n" +
>                 "  iap_product_id        STRING,\n" +
>                 "  neo_product_code      STRING,\n" +
>                 "  paid_redirect_url     STRING,\n" +
>                 "  subscription_type     STRING\n" +
>                 ") WITH (\n" +
>                 "  'connector' = 'kafka',\n" +
>                 "  'topic' = 'dim-bundles',\n" +
>                 "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
>                 "  'properties.group.id' = 'vvp_dev',\n" +
>                 "  'scan.startup.mode' = 'latest-offset',\n" +
>                 "  'value.debezium-json.schema-include' = 'true',\n" +
>                 "  'value.format' = 'debezium-json',\n" +
>                 "  'value.debezium-json.ignore-parse-errors' = 'true'\n" +
>                 ")";
>         String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
>                 "    rowkey      STRING,\n" +
>                 "    cf ROW<id STRING, category_id STRING, upc STRING, `name` 
> STRING, price_cents STRING, original_price_cents STRING, short_desc STRING, 
> `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING, 
> deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING, 
> is_payback STRING, is_support_iap STRING, iap_product_id STRING, 
> neo_product_code STRING, paid_redirect_url STRING, subscription_type 
> STRING>\n" +
>                 ") with (\n" +
>                 "  'connector'='hbase-2.2',\n" +
>                 "  'table-name'='dim_hbase',\n" +
>                 "  'zookeeper.quorum'='localhost:2181',\n" +
>                 "  'sink.buffer-flush.max-size' = '0',\n" +
>                 "  'sink.buffer-flush.max-rows' = '1',\n" +
>                 "  'sink.delete.mode' = 'all-versions'\n" +
>                 ")";
>         String dml = "INSERT INTO dim_hbase\n" +
>                 "SELECT \n" +
>                 "  upc as rowkey,\n" +
>                 "  ROW(\n" +
>                 "    id, category_id, upc, `name`, price_cents, 
> original_price_cents, short_desc, `desc` , cover_url , created_at, 
> updated_at, deleted_at, extra , status , scholarship_cents , is_payback , 
> is_support_iap , iap_product_id , neo_product_code , paid_redirect_url , 
> subscription_type)\n" +
>                 "FROM kafka_llspay_bundles";
>         tEnv.executeSql(source);
>         tEnv.executeSql(sink);
>         tEnv.executeSql(dml);
>     }
> } {code}
> h1. Test
>  # After the test, we found that the deleted CF was the CF specified in the 
> Flink SQL DML statement, without affecting other CF.
> !image-2022-01-11-20-02-17-780.png|width=793,height=294!
>  
> h1. Reference
> Please look at the following task link.
> FLINK-25330
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to