[
https://issues.apache.org/jira/browse/FLINK-25560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bruce Wong updated FLINK-25560:
-------------------------------
Attachment: image-2022-01-11-20-02-17-780.png
> Add "sink.delete.mode" in HBase sql connector for retracting the latest
> version or all versions in changelog mode
> -----------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-25560
> URL: https://issues.apache.org/jira/browse/FLINK-25560
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / HBase
> Reporter: Bruce Wong
> Assignee: Jing Ge
> Priority: Major
> Labels: pull-request-available
> Attachments: image-2022-01-11-20-02-17-780.png
>
>
> h1. Motivation
> When we synchronize data from mysql to HBase, we find that when deleting data
> from mysql, HBase cannot delete all versions, which leads to incorrect
> semantics. So we want to add a parameter to control deleting the latest
> version or deleting all versions.
> h1. Usage
> The test code is as follows.
> {code:java}
> package com.bruce;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.TableConfig;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static
> org.apache.flink.configuration.ConfigConstants.LOCAL_START_WEBSERVER;
> public class KafkaToHBase {
> public static void main(String[] args) {
> Configuration cfg = new Configuration();
> cfg.setBoolean(LOCAL_START_WEBSERVER, true);
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(cfg);
> env.setParallelism(1);
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> envSettings);
> // TableConfig config = tEnv.getConfig();
> // config.setIdleStateRetention(Duration.ofHours(2));
> String source = "CREATE TEMPORARY TABLE IF NOT EXISTS
> kafka_llspay_bundles(\n" +
> " id STRING,\n" +
> " category_id STRING,\n" +
> " upc STRING,\n" +
> " `name` STRING,\n" +
> " price_cents STRING,\n" +
> " original_price_cents STRING,\n" +
> " short_desc STRING,\n" +
> " desc STRING,\n" +
> " cover_url STRING,\n" +
> " created_at STRING,\n" +
> " updated_at STRING,\n" +
> " deleted_at STRING,\n" +
> " extra STRING,\n" +
> " status STRING,\n" +
> " scholarship_cents STRING,\n" +
> " is_payback STRING,\n" +
> " is_support_iap STRING,\n" +
> " iap_product_id STRING,\n" +
> " neo_product_code STRING,\n" +
> " paid_redirect_url STRING,\n" +
> " subscription_type STRING\n" +
> ") WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = 'dim-bundles',\n" +
> " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
> " 'properties.group.id' = 'vvp_dev',\n" +
> " 'scan.startup.mode' = 'latest-offset',\n" +
> " 'value.debezium-json.schema-include' = 'true',\n" +
> " 'value.format' = 'debezium-json',\n" +
> " 'value.debezium-json.ignore-parse-errors' = 'true'\n" +
> ")";
> String sink = "CREATE TEMPORARY TABLE IF NOT EXISTS dim_hbase (\n" +
> " rowkey STRING,\n" +
> " cf ROW<id STRING, category_id STRING, upc STRING, `name`
> STRING, price_cents STRING, original_price_cents STRING, short_desc STRING,
> `desc` STRING, cover_url STRING, created_at STRING, updated_at STRING,
> deleted_at STRING, extra STRING, status STRING, scholarship_cents STRING,
> is_payback STRING, is_support_iap STRING, iap_product_id STRING,
> neo_product_code STRING, paid_redirect_url STRING, subscription_type
> STRING>\n" +
> ") with (\n" +
> " 'connector'='hbase-2.2',\n" +
> " 'table-name'='dim_hbase',\n" +
> " 'zookeeper.quorum'='localhost:2181',\n" +
> " 'sink.buffer-flush.max-size' = '0',\n" +
> " 'sink.buffer-flush.max-rows' = '1',\n" +
> " 'sink.delete.mode' = 'all-versions'\n" +
> ")";
> String dml = "INSERT INTO dim_hbase\n" +
> "SELECT \n" +
> " upc as rowkey,\n" +
> " ROW(\n" +
> " id, category_id, upc, `name`, price_cents,
> original_price_cents, short_desc, `desc` , cover_url , created_at,
> updated_at, deleted_at, extra , status , scholarship_cents , is_payback ,
> is_support_iap , iap_product_id , neo_product_code , paid_redirect_url ,
> subscription_type)\n" +
> "FROM kafka_llspay_bundles";
> tEnv.executeSql(source);
> tEnv.executeSql(sink);
> tEnv.executeSql(dml);
> }
> } {code}
> h1. Test
>
> h1. Reference
> Please look at the following task link.
> FLINK-25330
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)