This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 356e2af4b27b5a7250d9780ee7b41ce945b98c1e
Author: JunRuiLee <[email protected]>
AuthorDate: Wed Sep 11 09:55:16 2024 +0800

    [FLINK-36185][e2e] Disable PyFlink end-to-end case that uses Kafka legacy 
source.
---
 .../test-scripts/test_pyflink.sh                   | 180 +++++++++++----------
 1 file changed, 98 insertions(+), 82 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_pyflink.sh 
b/flink-end-to-end-tests/test-scripts/test_pyflink.sh
index a078e84da6b..9eb688ce2ed 100755
--- a/flink-end-to-end-tests/test-scripts/test_pyflink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_pyflink.sh
@@ -61,7 +61,14 @@ source "${CURRENT_DIR}"/kafka_sql_common.sh \
 
 function test_clean_up {
     stop_cluster
-    stop_kafka_cluster
+    # Note: The 'data_stream_job.py' still uses the Kafka legacy source,
+    # so we temporarily disable this case because we plan to address this
+    # in future updates. Once we align with the upcoming PyFlink and Flink 2.0
+    # compatibility efforts, we will remove this temporary disable.
+    #
+    # So we should also skip stopping the Kafka cluster because we did not 
start the Kafka cluster.
+
+    # stop_kafka_cluster
 }
 on_exit test_clean_up
 
@@ -191,90 +198,99 @@ JOB_ID=$($FLINK_DIR/bin/sql-client.sh \
 
 wait_job_terminal_state "$JOB_ID" "FINISHED"
 
-echo "Test PyFlink DataStream job:"
+# Note: The 'data_stream_job.py' still uses the Kafka legacy source,
+# so we temporarily disable this case because we plan to address this
+# in future updates. Once we align with the upcoming PyFlink and Flink 2.0
+# compatibility efforts, we will remove this temporary disable.
 
-# prepare Kafka
-echo "Preparing Kafka..."
-
-setup_kafka_dist
-
-start_kafka_cluster
-
-# End to end test for DataStream ProcessFunction with timer
-create_kafka_topic 1 1 timer-stream-source
-create_kafka_topic 1 1 timer-stream-sink
-
-PAYMENT_MSGS='{"createTime": 1603679413000, "orderId": 1603679414, 
"payAmount": 83685.44904332698, "payPlatform": 0, "provinceId": 3}
-{"createTime": 1603679426000, "orderId": 1603679427, "payAmount": 
30092.50657757042, "payPlatform": 0, "provinceId": 1}
-{"createTime": 1603679427000, "orderId": 1603679428, "payAmount": 
62644.01719293056, "payPlatform": 0, "provinceId": 6}
-{"createTime": 1603679428000, "orderId": 1603679429, "payAmount": 
6449.806795118451, "payPlatform": 0, "provinceId": 2}
-{"createTime": 1603679491000, "orderId": 1603679492, "payAmount": 
41108.36128417494, "payPlatform": 0, "provinceId": 0}
-{"createTime": 1603679492000, "orderId": 1603679493, "payAmount": 
64882.44233197067, "payPlatform": 0, "provinceId": 4}
-{"createTime": 1603679521000, "orderId": 1603679522, "payAmount": 
81648.80712644062, "payPlatform": 0, "provinceId": 3}
-{"createTime": 1603679522000, "orderId": 1603679523, "payAmount": 
81861.73063103345, "payPlatform": 0, "provinceId": 4}'
-
-function send_msg_to_kafka {
-    while read line
-    do
-           send_messages_to_kafka "$line" "timer-stream-source"
-        sleep 1
-    done <<< "$1"
-}
-
-function read_msg_from_kafka {
-    $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
--from-beginning \
-    --max-messages $1 \
-    --topic $2 \
-    --consumer-property group.id=$3 --timeout-ms 90000 2> /dev/null
-}
-
-send_msg_to_kafka "${PAYMENT_MSGS[*]}"
-
-JOB_ID=$(${FLINK_DIR}/bin/flink run \
-    -pyfs "${FLINK_PYTHON_TEST_DIR}/python/datastream" \
-    -pyreq "${REQUIREMENTS_PATH}" \
-    -pyarch "${TEST_DATA_DIR}/venv.zip" \
-    -pyexec "venv.zip/.conda/bin/python" \
-    -pym "data_stream_job" \
-    -j "${KAFKA_SQL_JAR}")
-
-echo "${JOB_ID}"
-JOB_ID=`echo "${JOB_ID}" | sed 's/.* //g'`
-
-wait_job_running ${JOB_ID}
-
-echo "Reading kafka messages..."
-READ_MSG=$(read_msg_from_kafka 16 timer-stream-sink pyflink-e2e-test-timer)
+# echo "Test PyFlink DataStream job:"
+#
+# # Prepare Kafka for the DataStream job
+# echo "Preparing Kafka..."
+#
+# # Setup Kafka distribution
+# setup_kafka_dist
+#
+# # Start Kafka cluster
+# start_kafka_cluster
+#
+# # End to end test for DataStream ProcessFunction with timer
+# # Creating necessary Kafka topics for the test
+# create_kafka_topic 1 1 timer-stream-source
+# create_kafka_topic 1 1 timer-stream-sink
+#
+# # Preparing a set of payment messages in JSON format for the test
+# PAYMENT_MSGS='{"createTime": 1603679413000, "orderId": 1603679414, 
"payAmount": 83685.44904332698, "payPlatform": 0, "provinceId": 3}
+# {"createTime": 1603679426000, "orderId": 1603679427, "payAmount": 
30092.50657757042, "payPlatform": 0, "provinceId": 1}
+# {"createTime": 1603679427000, "orderId": 1603679428, "payAmount": 
62644.01719293056, "payPlatform": 0, "provinceId": 6}
+# {"createTime": 1603679428000, "orderId": 1603679429, "payAmount": 
6449.806795118451, "payPlatform": 0, "provinceId": 2}
+# {"createTime": 1603679491000, "orderId": 1603679492, "payAmount": 
41108.36128417494, "payPlatform": 0, "provinceId": 0}
+# {"createTime": 1603679492000, "orderId": 1603679493, "payAmount": 
64882.44233197067, "payPlatform": 0, "provinceId": 4}
+# {"createTime": 1603679521000, "orderId": 1603679522, "payAmount": 
81648.80712644062, "payPlatform": 0, "provinceId": 3}
+# {"createTime": 1603679522000, "orderId": 1603679523, "payAmount": 
81861.73063103345, "payPlatform": 0, "provinceId": 4}'
+
+# function send_msg_to_kafka {
+#     while read line
+#     do
+#          send_messages_to_kafka "$line" "timer-stream-source"
+#         sleep 1
+#     done <<< "$1"
+# }
+
+# function read_msg_from_kafka {
+#     $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server 
localhost:9092 --from-beginning \
+#     --max-messages $1 \
+#     --topic $2 \
+#     --consumer-property group.id=$3 --timeout-ms 90000 2> /dev/null
+# }
+
+# send_msg_to_kafka "${PAYMENT_MSGS[*]}"
+
+# JOB_ID=$(${FLINK_DIR}/bin/flink run \
+#     -pyfs "${FLINK_PYTHON_TEST_DIR}/python/datastream" \
+#     -pyreq "${REQUIREMENTS_PATH}" \
+#     -pyarch "${TEST_DATA_DIR}/venv.zip" \
+#     -pyexec "venv.zip/.conda/bin/python" \
+#     -pym "data_stream_job" \
+#     -j "${KAFKA_SQL_JAR}")
+
+# echo "${JOB_ID}"
+# JOB_ID=`echo "${JOB_ID}" | sed 's/.* //g'`
+
+# wait_job_running ${JOB_ID}
+
+# echo "Reading kafka messages..."
+# READ_MSG=$(read_msg_from_kafka 16 timer-stream-sink pyflink-e2e-test-timer)
 
 # We use env.execute_async() to submit the job, cancel it after fetched 
results.
-cancel_job "${JOB_ID}"
-
-EXPECTED_MSG='Current key: 1603679414, orderId: 1603679414, payAmount: 
83685.44904332698, timestamp: 1603679413000
-Current key: 1603679427, orderId: 1603679427, payAmount: 30092.50657757042, 
timestamp: 1603679426000
-Current key: 1603679428, orderId: 1603679428, payAmount: 62644.01719293056, 
timestamp: 1603679427000
-Current key: 1603679429, orderId: 1603679429, payAmount: 6449.806795118451, 
timestamp: 1603679428000
-Current key: 1603679492, orderId: 1603679492, payAmount: 41108.36128417494, 
timestamp: 1603679491000
-Current key: 1603679493, orderId: 1603679493, payAmount: 64882.44233197067, 
timestamp: 1603679492000
-Current key: 1603679522, orderId: 1603679522, payAmount: 81648.80712644062, 
timestamp: 1603679521000
-Current key: 1603679523, orderId: 1603679523, payAmount: 81861.73063103345, 
timestamp: 1603679522000
-On timer timestamp: -9223372036854774308
-On timer timestamp: -9223372036854774308
-On timer timestamp: -9223372036854774308
-On timer timestamp: -9223372036854774308
-On timer timestamp: -9223372036854774308
-On timer timestamp: -9223372036854774308
-On timer timestamp: -9223372036854774308
-On timer timestamp: -9223372036854774308'
-
-EXPECTED_MSG=$(sort_msg "${EXPECTED_MSG[*]}")
-SORTED_READ_MSG=$(sort_msg "${READ_MSG[*]}")
-
-if [[ "${EXPECTED_MSG[*]}" != "${SORTED_READ_MSG[*]}" ]]; then
-    echo "Output from Flink program does not match expected output."
-    echo -e "EXPECTED Output: --${EXPECTED_MSG[*]}--"
-    echo -e "ACTUAL: --${SORTED_READ_MSG[*]}--"
-    exit 1
-fi
+# cancel_job "${JOB_ID}"
+
+# EXPECTED_MSG='Current key: 1603679414, orderId: 1603679414, payAmount: 
83685.44904332698, timestamp: 1603679413000
+# Current key: 1603679427, orderId: 1603679427, payAmount: 30092.50657757042, 
timestamp: 1603679426000
+# Current key: 1603679428, orderId: 1603679428, payAmount: 62644.01719293056, 
timestamp: 1603679427000
+# Current key: 1603679429, orderId: 1603679429, payAmount: 6449.806795118451, 
timestamp: 1603679428000
+# Current key: 1603679492, orderId: 1603679492, payAmount: 41108.36128417494, 
timestamp: 1603679491000
+# Current key: 1603679493, orderId: 1603679493, payAmount: 64882.44233197067, 
timestamp: 1603679492000
+# Current key: 1603679522, orderId: 1603679522, payAmount: 81648.80712644062, 
timestamp: 1603679521000
+# Current key: 1603679523, orderId: 1603679523, payAmount: 81861.73063103345, 
timestamp: 1603679522000
+# On timer timestamp: -9223372036854774308
+# On timer timestamp: -9223372036854774308
+# On timer timestamp: -9223372036854774308
+# On timer timestamp: -9223372036854774308
+# On timer timestamp: -9223372036854774308
+# On timer timestamp: -9223372036854774308
+# On timer timestamp: -9223372036854774308
+# On timer timestamp: -9223372036854774308'
+
+# EXPECTED_MSG=$(sort_msg "${EXPECTED_MSG[*]}")
+# SORTED_READ_MSG=$(sort_msg "${READ_MSG[*]}")
+
+# if [[ "${EXPECTED_MSG[*]}" != "${SORTED_READ_MSG[*]}" ]]; then
+#     echo "Output from Flink program does not match expected output."
+#     echo -e "EXPECTED Output: --${EXPECTED_MSG[*]}--"
+#     echo -e "ACTUAL: --${SORTED_READ_MSG[*]}--"
+#     exit 1
+# fi
 
 # clean up python env
 "${FLINK_PYTHON_DIR}/dev/lint-python.sh" -r

Reply via email to