[ 
https://issues.apache.org/jira/browse/FLINK-37188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931131#comment-17931131
 ] 

Yang Li commented on FLINK-37188:
---------------------------------

*Cross-team verification for FLINK-37133 "Submitting Refresh Job of 
Materialized Table to Yarn/K8s"*

*Conclusion*

The overall behavior meets expectations.

*Yarn*

*Yarn-Application*

*Start Hadoop Cluster*
|Java
export END_TO_END_DIR=flink/flink-end-to-end-tests/
docker compose -f 
"${END_TO_END_DIR}/test-scripts/docker-hadoop-secure-cluster/docker-compose.yml"
 up -d|

*Prepare Flink Environment*

Build Flink
|Java
mvn clean install -DskipTests -Dfast|

Copy flink to hadoop docker
|Java
tar czf flink.tar.gz -C 
flink/flink-dist/target/flink-2.0-SNAPSHOT-bin/flink-2.0-SNAPSHOT/ . 
docker cp flink.tar.gz  master:/home/hadoop-user/
docker exec master bash -c "tar xzf /home/hadoop-user/flink.tar.gz --directory 
/home/hadoop-user/"# copy openjdk 17 to the iamge
docker cp openlogic-openjdk-17.0.7+7-linux-x64.tar.gz master:/home/hadoop-user/|

*Prepare Hadoop/Flink configuration*
|Java
docker exec -it master bash 
cd /home/hadop-user/
tar -xfz openlogic-openjdk-17.0.7+7-linux-x64.tar.gz
export JAVA_HOME=./openlogic-openjdk-17.0.7+7-linux-x64/
export FLINK_HOME=./
export FLINK_CONF_DIR=./conf
export HADOOP_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath)|

*Continuous Mode Materialized Table*

Start sql-client in yarn-application mode
|Java
./bin/sql-client.sh gateway --endpoint http://127.0.0.1:8083
Flink SQL> SET 'execution.mode' = 'yarn-application';
[INFO] Execute statement succeeded.|

Create catalog
|Java
CREATE CATALOG mt_cat WITH ('type' = 'test-filesystem','path' = 
'/home/hadoop-user/liyang-test/catalog_path','default-database' = 'mydb');|

Create source
|Java
USE CATALOG mt_cat;

CREATE TABLE json_source (
  order_id BIGINT,
  user_id BIGINT,
  user_name STRING,
  order_created_at STRING,
  payment_amount_cents BIGINT) WITH ('format' = 
'json','source.monitor-interval' = '10s');
  
  SET 'execution.checkpointing.savepoint-dir' = 
'file:///home/hadoop-user/liyang-test/savepoints_path';|

Create materialized table
|Java
Flink SQL> CREATE MATERIALIZED TABLE continuous_users_shops
> PARTITIONED BY (ds)
> WITH (
>   'format' = 'debezium-json',
>   'sink.rolling-policy.rollover-interval' = '10s',
>   'sink.rolling-policy.check-interval' = '10s'
> )
> FRESHNESS = INTERVAL '30' SECOND
> AS SELECT
>   user_id,
>   ds,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS PV
> FROM (
>   SELECT user_id, order_created_at AS ds, payment_amount_cents
>     FROM json_source
>   ) AS tmp
> GROUP BY user_id, ds;|

Suspend then insert value to source table and resume
|Java
Flink SQL> ALTER MATERIALIZED TABLE continuous_users_shops SUSPEND;
[INFO] Execute statement succeeded.|

 
|Java
Flink SQL> INSERT INTO json_source VALUES  (1010, 100, 'user1', '2024-06-19', 
10);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: edc07df938da0b9a3b9b4615e9f754db|

 
|Java
Flink SQL> ALTER MATERIALIZED TABLE continuous_users_shops RESUME;
[INFO] Execute statement succeeded.
Flink SQL> select * from continuous_users_shops;
[INFO] Result retrieval cancelled.|

The result is as expected

!image-2025-02-27-19-59-28-286.png|width=707,height=120!

*Full Mode Materialized Table*

Create table in full mode
|Java
Flink SQL> CREATE MATERIALIZED TABLE full_users_shops
> PARTITIONED BY (ds)
> WITH (
>   'format' = 'json',
>   'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
> )
> FRESHNESS = INTERVAL '1' MINUTE
> REFRESH_MODE = FULL
> AS SELECT
>   user_id,
>   ds,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS PV
> FROM (
>   SELECT user_id, order_created_at AS ds, payment_amount_cents
>   FROM json_source
> ) AS tmp
> GROUP BY user_id, ds;
[INFO] Execute statement succeeded.|

Suspend then insert values to source table and resume
|Java
Flink SQL> ALTER MATERIALIZED TABLE full_users_shops SUSPEND;
[INFO] Execute statement succeeded.

Flink SQL> INSERT INTO json_source VALUES 
>   (1001, 1, 'user1', CAST(CURRENT_DATE AS STRING), 10),
>   (1002, 2, 'user2', CAST(CURRENT_DATE AS STRING), 20),
>   (1003, 3, 'user3', CAST(CURRENT_DATE AS STRING), 30),
>   (1004, 4, 'user4', CAST(CURRENT_DATE AS STRING), 40);

Flink SQL> ALTER MATERIALIZED TABLE full_users_shops RESUME;
[INFO] Execute statement succeeded.

Flink SQL> select * from full_users_shops;
[INFO] Result retrieval cancelled.|

The values selected from full_users_shops is as expected

!image-2025-02-27-19-59-44-183.png|width=668,height=97!

*Yarn-Session*

Start sql-client in Yarn-session mode
|Java
Flink SQL> SET 'execution.mode' = 'yarn-session';
[INFO] Execute statement succeeded.|

*Continuous Mode Materialized Table*

Create materialized table as the same way in Yarn-application mode
|Java
Flink SQL> ALTER MATERIALIZED TABLE continuous_users_shops SUSPEND;
[INFO] Execute statement succeeded.

Flink SQL> ALTER MATERIALIZED TABLE continuous_users_shops RESUME;
[INFO] Execute statement succeeded.

Flink SQL> INSERT INTO json_source VALUES  (1010, 300, 'user1', '2024-06-19', 
10);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 1cbf3f45d766287b99846fbc97d961ac|

I can get the data from continuous_users_shops, it works.

!image-2025-02-27-20-00-01-252.png|width=697,height=73!

*Full Mode Materialized Table*
|Java
Flink SQL> ALTER MATERIALIZED TABLE full_users_shops SUSPEND;
[INFO] Execute statement succeeded.

Flink SQL> ALTER MATERIALIZED TABLE full_users_shops RESUME;
[INFO] Execute statement succeeded.

Flink SQL> INSERT INTO json_source VALUES 
>   (1005, 5, 'user5', CAST(CURRENT_DATE AS STRING), 10),
>   (1006, 6, 'user6', CAST(CURRENT_DATE AS STRING), 20),
>   (1007, 7, 'user7', CAST(CURRENT_DATE AS STRING), 30),
>   (1008, 8, 'user8', CAST(CURRENT_DATE AS STRING), 40);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 40035f01c8991ebfa25716c249bac816

Flink SQL> select * from full_users_shops;
[INFO] Result retrieval cancelled.|

I can get the data from full_users_shops, it works.

!image-2025-02-27-20-00-18-975.png|width=714,height=107!

*K8s*

*Environment Preparation*

Start minikube as k8s cluster
|Java
minikube start|

Prepare S3 environment
|Java
docker run -d \-P \
--mount type=bind,source="./s3_test_data",target=/data \-e 
"MINIO_ACCESS_KEY=access_key" -e "MINIO_SECRET_KEY=secret_key" -e 
"MINIO_DOMAIN=localhost" \
minio/minio \
server \
/data


# docker ps find the containerId 
export S3_ENDPOINT="http://$(docker port "890eb041caef" 9000 \| grep -F 
'0.0.0.0' \| sed s'/0\.0\.0\.0/localhost/')"


# relace localhost with local IP 
S3_ENDPOINT=$(echo $S3_ENDPOINT \| sed 's/localhost/10.189.47.244/')

mc alias set myminio $S3_ENDPOINT access_key secret_key
mc cp README.txt myminio/test-bucket/liyant-test/catalog_store_path/.emptyfile
mc cp README.txt myminio/test-bucket/liyant-test/catalog_path/.emptyfile
mc cp README.txt myminio/test-bucket/liyant-test/catalog_path/mydb/.emptyfile
mc cp README.txt myminio/test-bucket/liyant-test/checkpoints_path/.emptyfile
mc cp README.txt myminio/test-bucket/liyant-test/savepoints_path/.emptyfile
mc rm myminio/test-bucket/liyant-test/catalog_store_path/.emptyfile
mc rm myminio/test-bucket/liyant-test/catalog_path/.emptyfile
mc rm myminio/test-bucket/liyant-test/catalog_path/mydb/.emptyfile
mc rm myminio/test-bucket/liyant-test/checkpoints_path/.emptyfile
mc rm myminio/test-bucket/liyant-test/savepoints_path/.emptyfile
mc cp README.txt myminio/test-bucket/liyant-test/catalog_path/mydb/.emptyfile|

Prepare flink/lib
|Java
flink-cep-2.0-SNAPSHOT.jar
flink-connector-files-2.0-SNAPSHOT.jar
flink-csv-2.0-SNAPSHOT.jar
flink-dist-2.0-SNAPSHOT.jar
flink-json-2.0-SNAPSHOT.jar
flink-migration-test-utils-2.0-SNAPSHOT.jar
flink-s3-fs-hadoop-2.0-SNAPSHOT.jar
flink-scala_2.12-2.0-SNAPSHOT.jar
flink-table-api-java-uber-2.0-SNAPSHOT.jar
flink-table-filesystem-test-utils-2.0-SNAPSHOT.jar
flink-table-planner-loader-2.0-SNAPSHOT.jar
flink-table-runtime-2.0-SNAPSHOT.jar
log4j-1.2-api-2.24.1.jar
log4j-api-2.24.1.jar
log4j-core-2.24.1.jar
log4j-slf4j-impl-2.24.1.jar|

Prepare flink/config.yaml
|Java
execution:
  checkpoints:
    dir: s3://test-bucket/liyang-test/checkpoints_path

# Configure file catalog store
table:
  catalog-store:
    kind: file
    file:
      path: 's3://test-bucket/liyang-test/catalog_store_path'

# Configure embedded scheduler
workflow-scheduler:
  type: embedded

# Configure SQL gateway address and port
sql-gateway:
  endpoint:
    rest:
      address: 127.0.0.1
      port: 8083

s3.access-key: access_key
s3.secret-key: secret_key
s3.endpoint: http://10.189.47.244:32770|

Prepare dockerfile
|Java

FROM flink:2.0-scala_2.12-java17

ARG FLINK_PATH=/opt/flink
ARG LOCAL_FLINK_PATH=flink-2.0-SNAPSHOT

RUN rm -rf $FLINK_PATH/lib
RUN rm -rf $FLINK_PATH/opt
RUN rm -rf $FLINK_PATH/bin

COPY $LOCAL_FLINK_PATH/lib $FLINK_PATH/lib
COPY $LOCAL_FLINK_PATH/opt $FLINK_PATH/opt
COPY $LOCAL_FLINK_PATH/bin $FLINK_PATH/bin
COPY $LOCAL_FLINK_PATH/conf/config.yaml $FLINK_PATH/conf/config.yaml

RUN chown -R flink:flink $FLINK_PATH|

Create the image based on the official image
|Java
docker build -t test_kubernetes_application-1:latest .|

Deploy it to k8s cluster
|Java
docker image save test_kubernetes_application-1:latest -o  
test_kubernetes_application-1
minikube image load test_kubernetes_application-1|

*k8s - Application*

Starts the Flink SQL Client, connects it to the Flink Gateway running on 
localhost at port 8083, and configures it to deploy the Flink job with 
k8s-application mode on a Kubernetes cluster .
|Java
./bin/sql-client.sh gateway --endpoint 127.0.0.1:8083 \
-Dexecution.target=kubernetes-application \
-Dkubernetes.container.image.ref=test_kubernetes_application-1:latest \|

Repeat what I have done when i validated yarn.

*Continuous Mode Materialized Table*
|Java
Flink SQL> set 'execution.checkpointing.savepoint-dir' = 
's3://test-bucket/liyant-test/savepoints_path';
Flink SQL> set 'kubernetes.rest-service.exposed.type' = 'NodePort';
Flink SQL> CREATE CATALOG mt_cat WITH ('type' = 'test-filesystem','path' = 
's3://test-bucket/liyant-test/catalog_path','default-database' = 'mydb');
Flink SQL> use catalog mt_cat;|

Create continuous materialized table.
|Java
CREATE TABLE test_source(a varchar) with ('connector' = 'datagen');
CREATE MATERIALIZED TABLE continuous_users_shops
WITH (
  'format' = 'debezium-json',
  'sink.rolling-policy.rollover-interval' = '10s',
  'sink.rolling-policy.check-interval' = '10s'
)
FRESHNESS = INTERVAL '30' SECOND
AS SELECT
  a from test_source;|

The validation results is shown below.
|Java
Flink SQL> alter MATERIALIZED TABLE continuous_users_shops suspend;
[INFO] Execute statement succeeded.

Flink SQL> alter MATERIALIZED TABLE continuous_users_shops resume;
[INFO] Execute statement succeeded.|

The job restored from savepoin successfully.

!image-2025-02-27-20-01-09-371.png|width=552,height=132!

*Full Mode Materialized Table*

Create full mode materialized table.
|Java
Flink SQL> set 'kubernetes.rest-service.exposed.type' = 'NodePort';
Flink SQL> use catalog mt_cat;|

Create materialized table.
|Java
Flink SQL> CREATE TABLE json_source (
>   order_id BIGINT,
>   user_id BIGINT,
>   user_name STRING,
>   order_created_at STRING,
>   payment_amount_cents BIGINT) WITH ('format' = 
> 'json','source.monitor-interval' = '10s');
[INFO] Execute statement succeeded.

Flink SQL> CREATE MATERIALIZED TABLE full_users_shops2
> PARTITIONED BY (ds)
> WITH (
>   'format' = 'json',
>   'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
> )
> FRESHNESS = INTERVAL '1' MINUTE
> REFRESH_MODE = FULL
> AS SELECT
>   user_id,
>   ds,
>   SUM (payment_amount_cents) AS payed_buy_fee_sum,
>   SUM (1) AS PV
> FROM (
>   SELECT user_id, order_created_at AS ds, payment_amount_cents
>   FROM json_source
> ) AS tmp
> GROUP BY user_id, ds;|

The validation results is shown below.
|Java
Flink SQL> alter MATERIALIZED TABLE full_users_shops2 suspend;
[INFO] Execute statement succeeded.

Flink SQL> alter MATERIALIZED TABLE full_users_shops2 resume;
[INFO] Execute statement succeeded.|

The batch job can be scheduled.

!image-2025-02-27-20-01-25-183.png|width=680,height=235!

*k8s - Session*

Deploy Session
|Java
./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster 
-Dkubernetes.rest-service.exposed.type=Nodeport 
-Dkubernetes.container.image.ref=test_kubernetes_application-1:latest|

Starts the Flink SQL Client, connects it to the Flink Gateway running on 
localhost at port 8083, and configures it to deploy the Flink job with 
k8s-application mode on a Kubernetes cluster .
|Java
./bin/sql-client.sh gateway --endpoint 127.0.0.1:8083 \
-Dexecution.target=kubernetes-session \
-Dkubernetes.container.image.ref=test_kubernetes_application-1:latest \
Flink SQL> set 'execution.checkpointing.savepoint-dir' = 
's3://test-bucket/liyant-test/savepoints_path';
Flink SQL> set 'kubernetes.rest-service.exposed.type' = 'NodePort';
Flink SQL> set 'kubernetes.cluster-id' = 'my-first-flink-cluster';
Flink SQL> use catalog mt_cat;|

*Continuous Mode Materialized Table*
|Java
Flink SQL> alter MATERIALIZED table continuous_users_shops suspend;
[INFO] Execute statement succeeded.

Flink SQL> alter MATERIALIZED table continuous_users_shops resume;
[INFO] Execute statement succeeded.|

 
|Java
 -> kubectl get pods
NAME                                     READY   STATUS    RESTARTS   AGE
my-first-flink-cluster-8d54f8df4-bfbdh   1/1     Running   0          26m
*my-first-flink-cluster-taskmanager-1-3   1/1     Running   0          5s*|

*Full Mode Materialized Table*
|Java
Flink SQL> alter MATERIALIZED TABLE full_users_shops suspend;
[INFO] Execute statement succeeded.

Flink SQL> alter MATERIALIZED TABLE full_users_shops resume;
[INFO] Execute statement succeeded.|

The batch job can be scheduled.
|Java
NAME                                     READY   STATUS    RESTARTS   AGE
my-first-flink-cluster-8d54f8df4-cdmwn   1/1     Running   0          2m24s
*my-first-flink-cluster-taskmanager-1-1   1/1     Running   0          10s*
*my-first-flink-cluster-taskmanager-1-2   1/1     Running   0          10s*
*my-first-flink-cluster-taskmanager-1-3   1/1     Running   0          10s*
*my-first-flink-cluster-taskmanager-1-4   1/1     Running   0          10s*
*my-first-flink-cluster-taskmanager-1-5   1/1     Running   0          9s*|

 

> Cross-team verification for FLINK-37133 "Submitting Refresh Job of 
> Materialized Table to Yarn/K8s"
> --------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-37188
>                 URL: https://issues.apache.org/jira/browse/FLINK-37188
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Feng Jin
>            Assignee: Yang Li
>            Priority: Major
>             Fix For: 2.0.0
>
>         Attachments: image-2025-02-27-19-55-42-747.png, 
> image-2025-02-27-19-59-28-286.png, image-2025-02-27-19-59-44-183.png, 
> image-2025-02-27-20-00-01-252.png, image-2025-02-27-20-00-18-975.png, 
> image-2025-02-27-20-01-09-371.png, image-2025-02-27-20-01-25-183.png
>
>
> *Operation steps:*
> 1. Prepare cluster environment including k8s/yarn. 
> 2. Refer to 
> quickstart([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/materialized-table/quickstart/])
>  to prepare the environment required for Materialized table: including 
> catalog store/test-filesystem plugin/sql gateway.
> 3. Create Materialized Tables in two modes (Continuous and Full) in different 
> deployment mode (yarn-session, yarn-application, kubernetes-session, 
> kubernetes-application)
> 4. Alter Suspend/Resume  the materialized table
>  
> *Verify The result:*
>  Verify that the creation, suspension, and resumption of the Materialized 
> table can be performed normally across different environments.
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to