[
https://issues.apache.org/jira/browse/FLINK-37188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931131#comment-17931131
]
Yang Li commented on FLINK-37188:
---------------------------------
*Cross-team verification for FLINK-37133 "Submitting Refresh Job of
Materialized Table to Yarn/K8s"*
*Conclusion*
The overall behavior meets expectations.
*Yarn*
*Yarn-Application*
*Start Hadoop Cluster*
|Java
export END_TO_END_DIR=flink/flink-end-to-end-tests/
docker compose -f
"${END_TO_END_DIR}/test-scripts/docker-hadoop-secure-cluster/docker-compose.yml"
up -d|
*Prepare Flink Environment*
Build Flink
|Java
mvn clean install -DskipTests -Dfast|
Copy flink to hadoop docker
|Java
tar czf flink.tar.gz -C
flink/flink-dist/target/flink-2.0-SNAPSHOT-bin/flink-2.0-SNAPSHOT/ .
docker cp flink.tar.gz master:/home/hadoop-user/
docker exec master bash -c "tar xzf /home/hadoop-user/flink.tar.gz --directory
/home/hadoop-user/"# copy openjdk 17 to the iamge
docker cp openlogic-openjdk-17.0.7+7-linux-x64.tar.gz master:/home/hadoop-user/|
*Prepare Hadoop/Flink configuration*
|Java
docker exec -it master bash
cd /home/hadop-user/
tar -xfz openlogic-openjdk-17.0.7+7-linux-x64.tar.gz
export JAVA_HOME=./openlogic-openjdk-17.0.7+7-linux-x64/
export FLINK_HOME=./
export FLINK_CONF_DIR=./conf
export HADOOP_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath)|
*Continuous Mode Materialized Table*
Start sql-client in yarn-application mode
|Java
./bin/sql-client.sh gateway --endpoint http://127.0.0.1:8083
Flink SQL> SET 'execution.mode' = 'yarn-application';
[INFO] Execute statement succeeded.|
Create catalog
|Java
CREATE CATALOG mt_cat WITH ('type' = 'test-filesystem','path' =
'/home/hadoop-user/liyang-test/catalog_path','default-database' = 'mydb');|
Create source
|Java
USE CATALOG mt_cat;
CREATE TABLE json_source (
order_id BIGINT,
user_id BIGINT,
user_name STRING,
order_created_at STRING,
payment_amount_cents BIGINT) WITH ('format' =
'json','source.monitor-interval' = '10s');
SET 'execution.checkpointing.savepoint-dir' =
'file:///home/hadoop-user/liyang-test/savepoints_path';|
Create materialized table
|Java
Flink SQL> CREATE MATERIALIZED TABLE continuous_users_shops
> PARTITIONED BY (ds)
> WITH (
> 'format' = 'debezium-json',
> 'sink.rolling-policy.rollover-interval' = '10s',
> 'sink.rolling-policy.check-interval' = '10s'
> )
> FRESHNESS = INTERVAL '30' SECOND
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS PV
> FROM (
> SELECT user_id, order_created_at AS ds, payment_amount_cents
> FROM json_source
> ) AS tmp
> GROUP BY user_id, ds;|
Suspend then insert value to source table and resume
|Java
Flink SQL> ALTER MATERIALIZED TABLE continuous_users_shops SUSPEND;
[INFO] Execute statement succeeded.|
|Java
Flink SQL> INSERT INTO json_source VALUES (1010, 100, 'user1', '2024-06-19',
10);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: edc07df938da0b9a3b9b4615e9f754db|
|Java
Flink SQL> ALTER MATERIALIZED TABLE continuous_users_shops RESUME;
[INFO] Execute statement succeeded.
Flink SQL> select * from continuous_users_shops;
[INFO] Result retrieval cancelled.|
The result is as expected
!image-2025-02-27-19-59-28-286.png|width=707,height=120!
*Full Mode Materialized Table*
Create table in full mode
|Java
Flink SQL> CREATE MATERIALIZED TABLE full_users_shops
> PARTITIONED BY (ds)
> WITH (
> 'format' = 'json',
> 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
> )
> FRESHNESS = INTERVAL '1' MINUTE
> REFRESH_MODE = FULL
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS PV
> FROM (
> SELECT user_id, order_created_at AS ds, payment_amount_cents
> FROM json_source
> ) AS tmp
> GROUP BY user_id, ds;
[INFO] Execute statement succeeded.|
Suspend then insert values to source table and resume
|Java
Flink SQL> ALTER MATERIALIZED TABLE full_users_shops SUSPEND;
[INFO] Execute statement succeeded.
Flink SQL> INSERT INTO json_source VALUES
> (1001, 1, 'user1', CAST(CURRENT_DATE AS STRING), 10),
> (1002, 2, 'user2', CAST(CURRENT_DATE AS STRING), 20),
> (1003, 3, 'user3', CAST(CURRENT_DATE AS STRING), 30),
> (1004, 4, 'user4', CAST(CURRENT_DATE AS STRING), 40);
Flink SQL> ALTER MATERIALIZED TABLE full_users_shops RESUME;
[INFO] Execute statement succeeded.
Flink SQL> select * from full_users_shops;
[INFO] Result retrieval cancelled.|
The values selected from full_users_shops is as expected
!image-2025-02-27-19-59-44-183.png|width=668,height=97!
*Yarn-Session*
Start sql-client in Yarn-session mode
|Java
Flink SQL> SET 'execution.mode' = 'yarn-session';
[INFO] Execute statement succeeded.|
*Continuous Mode Materialized Table*
Create materialized table as the same way in Yarn-application mode
|Java
Flink SQL> ALTER MATERIALIZED TABLE continuous_users_shops SUSPEND;
[INFO] Execute statement succeeded.
Flink SQL> ALTER MATERIALIZED TABLE continuous_users_shops RESUME;
[INFO] Execute statement succeeded.
Flink SQL> INSERT INTO json_source VALUES (1010, 300, 'user1', '2024-06-19',
10);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 1cbf3f45d766287b99846fbc97d961ac|
I can get the data from continuous_users_shops, it works.
!image-2025-02-27-20-00-01-252.png|width=697,height=73!
*Full Mode Materialized Table*
|Java
Flink SQL> ALTER MATERIALIZED TABLE full_users_shops SUSPEND;
[INFO] Execute statement succeeded.
Flink SQL> ALTER MATERIALIZED TABLE full_users_shops RESUME;
[INFO] Execute statement succeeded.
Flink SQL> INSERT INTO json_source VALUES
> (1005, 5, 'user5', CAST(CURRENT_DATE AS STRING), 10),
> (1006, 6, 'user6', CAST(CURRENT_DATE AS STRING), 20),
> (1007, 7, 'user7', CAST(CURRENT_DATE AS STRING), 30),
> (1008, 8, 'user8', CAST(CURRENT_DATE AS STRING), 40);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 40035f01c8991ebfa25716c249bac816
Flink SQL> select * from full_users_shops;
[INFO] Result retrieval cancelled.|
I can get the data from full_users_shops, it works.
!image-2025-02-27-20-00-18-975.png|width=714,height=107!
*K8s*
*Environment Preparation*
Start minikube as k8s cluster
|Java
minikube start|
Prepare S3 environment
|Java
docker run -d \-P \
--mount type=bind,source="./s3_test_data",target=/data \-e
"MINIO_ACCESS_KEY=access_key" -e "MINIO_SECRET_KEY=secret_key" -e
"MINIO_DOMAIN=localhost" \
minio/minio \
server \
/data
# docker ps find the containerId
export S3_ENDPOINT="http://$(docker port "890eb041caef" 9000 \| grep -F
'0.0.0.0' \| sed s'/0\.0\.0\.0/localhost/')"
# relace localhost with local IP
S3_ENDPOINT=$(echo $S3_ENDPOINT \| sed 's/localhost/10.189.47.244/')
mc alias set myminio $S3_ENDPOINT access_key secret_key
mc cp README.txt myminio/test-bucket/liyant-test/catalog_store_path/.emptyfile
mc cp README.txt myminio/test-bucket/liyant-test/catalog_path/.emptyfile
mc cp README.txt myminio/test-bucket/liyant-test/catalog_path/mydb/.emptyfile
mc cp README.txt myminio/test-bucket/liyant-test/checkpoints_path/.emptyfile
mc cp README.txt myminio/test-bucket/liyant-test/savepoints_path/.emptyfile
mc rm myminio/test-bucket/liyant-test/catalog_store_path/.emptyfile
mc rm myminio/test-bucket/liyant-test/catalog_path/.emptyfile
mc rm myminio/test-bucket/liyant-test/catalog_path/mydb/.emptyfile
mc rm myminio/test-bucket/liyant-test/checkpoints_path/.emptyfile
mc rm myminio/test-bucket/liyant-test/savepoints_path/.emptyfile
mc cp README.txt myminio/test-bucket/liyant-test/catalog_path/mydb/.emptyfile|
Prepare flink/lib
|Java
flink-cep-2.0-SNAPSHOT.jar
flink-connector-files-2.0-SNAPSHOT.jar
flink-csv-2.0-SNAPSHOT.jar
flink-dist-2.0-SNAPSHOT.jar
flink-json-2.0-SNAPSHOT.jar
flink-migration-test-utils-2.0-SNAPSHOT.jar
flink-s3-fs-hadoop-2.0-SNAPSHOT.jar
flink-scala_2.12-2.0-SNAPSHOT.jar
flink-table-api-java-uber-2.0-SNAPSHOT.jar
flink-table-filesystem-test-utils-2.0-SNAPSHOT.jar
flink-table-planner-loader-2.0-SNAPSHOT.jar
flink-table-runtime-2.0-SNAPSHOT.jar
log4j-1.2-api-2.24.1.jar
log4j-api-2.24.1.jar
log4j-core-2.24.1.jar
log4j-slf4j-impl-2.24.1.jar|
Prepare flink/config.yaml
|Java
execution:
checkpoints:
dir: s3://test-bucket/liyang-test/checkpoints_path
# Configure file catalog store
table:
catalog-store:
kind: file
file:
path: 's3://test-bucket/liyang-test/catalog_store_path'
# Configure embedded scheduler
workflow-scheduler:
type: embedded
# Configure SQL gateway address and port
sql-gateway:
endpoint:
rest:
address: 127.0.0.1
port: 8083
s3.access-key: access_key
s3.secret-key: secret_key
s3.endpoint: http://10.189.47.244:32770|
Prepare dockerfile
|Java
FROM flink:2.0-scala_2.12-java17
ARG FLINK_PATH=/opt/flink
ARG LOCAL_FLINK_PATH=flink-2.0-SNAPSHOT
RUN rm -rf $FLINK_PATH/lib
RUN rm -rf $FLINK_PATH/opt
RUN rm -rf $FLINK_PATH/bin
COPY $LOCAL_FLINK_PATH/lib $FLINK_PATH/lib
COPY $LOCAL_FLINK_PATH/opt $FLINK_PATH/opt
COPY $LOCAL_FLINK_PATH/bin $FLINK_PATH/bin
COPY $LOCAL_FLINK_PATH/conf/config.yaml $FLINK_PATH/conf/config.yaml
RUN chown -R flink:flink $FLINK_PATH|
Create the image based on the official image
|Java
docker build -t test_kubernetes_application-1:latest .|
Deploy it to k8s cluster
|Java
docker image save test_kubernetes_application-1:latest -o
test_kubernetes_application-1
minikube image load test_kubernetes_application-1|
*k8s - Application*
Starts the Flink SQL Client, connects it to the Flink Gateway running on
localhost at port 8083, and configures it to deploy the Flink job with
k8s-application mode on a Kubernetes cluster .
|Java
./bin/sql-client.sh gateway --endpoint 127.0.0.1:8083 \
-Dexecution.target=kubernetes-application \
-Dkubernetes.container.image.ref=test_kubernetes_application-1:latest \|
Repeat what I have done when i validated yarn.
*Continuous Mode Materialized Table*
|Java
Flink SQL> set 'execution.checkpointing.savepoint-dir' =
's3://test-bucket/liyant-test/savepoints_path';
Flink SQL> set 'kubernetes.rest-service.exposed.type' = 'NodePort';
Flink SQL> CREATE CATALOG mt_cat WITH ('type' = 'test-filesystem','path' =
's3://test-bucket/liyant-test/catalog_path','default-database' = 'mydb');
Flink SQL> use catalog mt_cat;|
Create continuous materialized table.
|Java
CREATE TABLE test_source(a varchar) with ('connector' = 'datagen');
CREATE MATERIALIZED TABLE continuous_users_shops
WITH (
'format' = 'debezium-json',
'sink.rolling-policy.rollover-interval' = '10s',
'sink.rolling-policy.check-interval' = '10s'
)
FRESHNESS = INTERVAL '30' SECOND
AS SELECT
a from test_source;|
The validation results is shown below.
|Java
Flink SQL> alter MATERIALIZED TABLE continuous_users_shops suspend;
[INFO] Execute statement succeeded.
Flink SQL> alter MATERIALIZED TABLE continuous_users_shops resume;
[INFO] Execute statement succeeded.|
The job restored from savepoin successfully.
!image-2025-02-27-20-01-09-371.png|width=552,height=132!
*Full Mode Materialized Table*
Create full mode materialized table.
|Java
Flink SQL> set 'kubernetes.rest-service.exposed.type' = 'NodePort';
Flink SQL> use catalog mt_cat;|
Create materialized table.
|Java
Flink SQL> CREATE TABLE json_source (
> order_id BIGINT,
> user_id BIGINT,
> user_name STRING,
> order_created_at STRING,
> payment_amount_cents BIGINT) WITH ('format' =
> 'json','source.monitor-interval' = '10s');
[INFO] Execute statement succeeded.
Flink SQL> CREATE MATERIALIZED TABLE full_users_shops2
> PARTITIONED BY (ds)
> WITH (
> 'format' = 'json',
> 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd'
> )
> FRESHNESS = INTERVAL '1' MINUTE
> REFRESH_MODE = FULL
> AS SELECT
> user_id,
> ds,
> SUM (payment_amount_cents) AS payed_buy_fee_sum,
> SUM (1) AS PV
> FROM (
> SELECT user_id, order_created_at AS ds, payment_amount_cents
> FROM json_source
> ) AS tmp
> GROUP BY user_id, ds;|
The validation results is shown below.
|Java
Flink SQL> alter MATERIALIZED TABLE full_users_shops2 suspend;
[INFO] Execute statement succeeded.
Flink SQL> alter MATERIALIZED TABLE full_users_shops2 resume;
[INFO] Execute statement succeeded.|
The batch job can be scheduled.
!image-2025-02-27-20-01-25-183.png|width=680,height=235!
*k8s - Session*
Deploy Session
|Java
./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster
-Dkubernetes.rest-service.exposed.type=Nodeport
-Dkubernetes.container.image.ref=test_kubernetes_application-1:latest|
Starts the Flink SQL Client, connects it to the Flink Gateway running on
localhost at port 8083, and configures it to deploy the Flink job with
k8s-application mode on a Kubernetes cluster .
|Java
./bin/sql-client.sh gateway --endpoint 127.0.0.1:8083 \
-Dexecution.target=kubernetes-session \
-Dkubernetes.container.image.ref=test_kubernetes_application-1:latest \
Flink SQL> set 'execution.checkpointing.savepoint-dir' =
's3://test-bucket/liyant-test/savepoints_path';
Flink SQL> set 'kubernetes.rest-service.exposed.type' = 'NodePort';
Flink SQL> set 'kubernetes.cluster-id' = 'my-first-flink-cluster';
Flink SQL> use catalog mt_cat;|
*Continuous Mode Materialized Table*
|Java
Flink SQL> alter MATERIALIZED table continuous_users_shops suspend;
[INFO] Execute statement succeeded.
Flink SQL> alter MATERIALIZED table continuous_users_shops resume;
[INFO] Execute statement succeeded.|
|Java
-> kubectl get pods
NAME READY STATUS RESTARTS AGE
my-first-flink-cluster-8d54f8df4-bfbdh 1/1 Running 0 26m
*my-first-flink-cluster-taskmanager-1-3 1/1 Running 0 5s*|
*Full Mode Materialized Table*
|Java
Flink SQL> alter MATERIALIZED TABLE full_users_shops suspend;
[INFO] Execute statement succeeded.
Flink SQL> alter MATERIALIZED TABLE full_users_shops resume;
[INFO] Execute statement succeeded.|
The batch job can be scheduled.
|Java
NAME READY STATUS RESTARTS AGE
my-first-flink-cluster-8d54f8df4-cdmwn 1/1 Running 0 2m24s
*my-first-flink-cluster-taskmanager-1-1 1/1 Running 0 10s*
*my-first-flink-cluster-taskmanager-1-2 1/1 Running 0 10s*
*my-first-flink-cluster-taskmanager-1-3 1/1 Running 0 10s*
*my-first-flink-cluster-taskmanager-1-4 1/1 Running 0 10s*
*my-first-flink-cluster-taskmanager-1-5 1/1 Running 0 9s*|
> Cross-team verification for FLINK-37133 "Submitting Refresh Job of
> Materialized Table to Yarn/K8s"
> --------------------------------------------------------------------------------------------------
>
> Key: FLINK-37188
> URL: https://issues.apache.org/jira/browse/FLINK-37188
> Project: Flink
> Issue Type: Sub-task
> Reporter: Feng Jin
> Assignee: Yang Li
> Priority: Major
> Fix For: 2.0.0
>
> Attachments: image-2025-02-27-19-55-42-747.png,
> image-2025-02-27-19-59-28-286.png, image-2025-02-27-19-59-44-183.png,
> image-2025-02-27-20-00-01-252.png, image-2025-02-27-20-00-18-975.png,
> image-2025-02-27-20-01-09-371.png, image-2025-02-27-20-01-25-183.png
>
>
> *Operation steps:*
> 1. Prepare cluster environment including k8s/yarn.
> 2. Refer to
> quickstart([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/materialized-table/quickstart/])
> to prepare the environment required for Materialized table: including
> catalog store/test-filesystem plugin/sql gateway.
> 3. Create Materialized Tables in two modes (Continuous and Full) in different
> deployment mode (yarn-session, yarn-application, kubernetes-session,
> kubernetes-application)
> 4. Alter Suspend/Resume the materialized table
>
> *Verify The result:*
> Verify that the creation, suspension, and resumption of the Materialized
> table can be performed normally across different environments.
>
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)