This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 356e2af4b27b5a7250d9780ee7b41ce945b98c1e Author: JunRuiLee <[email protected]> AuthorDate: Wed Sep 11 09:55:16 2024 +0800 [FLINK-36185][e2e] Disable PyFlink end-to-end case that uses Kafka legacy source. --- .../test-scripts/test_pyflink.sh | 180 +++++++++++---------- 1 file changed, 98 insertions(+), 82 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/test_pyflink.sh b/flink-end-to-end-tests/test-scripts/test_pyflink.sh index a078e84da6b..9eb688ce2ed 100755 --- a/flink-end-to-end-tests/test-scripts/test_pyflink.sh +++ b/flink-end-to-end-tests/test-scripts/test_pyflink.sh @@ -61,7 +61,14 @@ source "${CURRENT_DIR}"/kafka_sql_common.sh \ function test_clean_up { stop_cluster - stop_kafka_cluster + # Note: The 'data_stream_job.py' still uses the Kafka legacy source, + # so we temporarily disable this case because we plan to address this + # in future updates. Once we align with the upcoming PyFlink and Flink 2.0 + # compatibility efforts, we will remove this temporary disable. + # + # So we should also skip stopping the Kafka cluster because we did not start the Kafka cluster. + + # stop_kafka_cluster } on_exit test_clean_up @@ -191,90 +198,99 @@ JOB_ID=$($FLINK_DIR/bin/sql-client.sh \ wait_job_terminal_state "$JOB_ID" "FINISHED" -echo "Test PyFlink DataStream job:" +# Note: The 'data_stream_job.py' still uses the Kafka legacy source, +# so we temporarily disable this case because we plan to address this +# in future updates. Once we align with the upcoming PyFlink and Flink 2.0 +# compatibility efforts, we will remove this temporary disable. -# prepare Kafka -echo "Preparing Kafka..." - -setup_kafka_dist - -start_kafka_cluster - -# End to end test for DataStream ProcessFunction with timer -create_kafka_topic 1 1 timer-stream-source -create_kafka_topic 1 1 timer-stream-sink - -PAYMENT_MSGS='{"createTime": 1603679413000, "orderId": 1603679414, "payAmount": 83685.44904332698, "payPlatform": 0, "provinceId": 3} -{"createTime": 1603679426000, "orderId": 1603679427, "payAmount": 30092.50657757042, "payPlatform": 0, "provinceId": 1} -{"createTime": 1603679427000, "orderId": 1603679428, "payAmount": 62644.01719293056, "payPlatform": 0, "provinceId": 6} -{"createTime": 1603679428000, "orderId": 1603679429, "payAmount": 6449.806795118451, "payPlatform": 0, "provinceId": 2} -{"createTime": 1603679491000, "orderId": 1603679492, "payAmount": 41108.36128417494, "payPlatform": 0, "provinceId": 0} -{"createTime": 1603679492000, "orderId": 1603679493, "payAmount": 64882.44233197067, "payPlatform": 0, "provinceId": 4} -{"createTime": 1603679521000, "orderId": 1603679522, "payAmount": 81648.80712644062, "payPlatform": 0, "provinceId": 3} -{"createTime": 1603679522000, "orderId": 1603679523, "payAmount": 81861.73063103345, "payPlatform": 0, "provinceId": 4}' - -function send_msg_to_kafka { - while read line - do - send_messages_to_kafka "$line" "timer-stream-source" - sleep 1 - done <<< "$1" -} - -function read_msg_from_kafka { - $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning \ - --max-messages $1 \ - --topic $2 \ - --consumer-property group.id=$3 --timeout-ms 90000 2> /dev/null -} - -send_msg_to_kafka "${PAYMENT_MSGS[*]}" - -JOB_ID=$(${FLINK_DIR}/bin/flink run \ - -pyfs "${FLINK_PYTHON_TEST_DIR}/python/datastream" \ - -pyreq "${REQUIREMENTS_PATH}" \ - -pyarch "${TEST_DATA_DIR}/venv.zip" \ - -pyexec "venv.zip/.conda/bin/python" \ - -pym "data_stream_job" \ - -j "${KAFKA_SQL_JAR}") - -echo "${JOB_ID}" -JOB_ID=`echo "${JOB_ID}" | sed 's/.* //g'` - -wait_job_running ${JOB_ID} - -echo "Reading kafka messages..." -READ_MSG=$(read_msg_from_kafka 16 timer-stream-sink pyflink-e2e-test-timer) +# echo "Test PyFlink DataStream job:" +# +# # Prepare Kafka for the DataStream job +# echo "Preparing Kafka..." +# +# # Setup Kafka distribution +# setup_kafka_dist +# +# # Start Kafka cluster +# start_kafka_cluster +# +# # End to end test for DataStream ProcessFunction with timer +# # Creating necessary Kafka topics for the test +# create_kafka_topic 1 1 timer-stream-source +# create_kafka_topic 1 1 timer-stream-sink +# +# # Preparing a set of payment messages in JSON format for the test +# PAYMENT_MSGS='{"createTime": 1603679413000, "orderId": 1603679414, "payAmount": 83685.44904332698, "payPlatform": 0, "provinceId": 3} +# {"createTime": 1603679426000, "orderId": 1603679427, "payAmount": 30092.50657757042, "payPlatform": 0, "provinceId": 1} +# {"createTime": 1603679427000, "orderId": 1603679428, "payAmount": 62644.01719293056, "payPlatform": 0, "provinceId": 6} +# {"createTime": 1603679428000, "orderId": 1603679429, "payAmount": 6449.806795118451, "payPlatform": 0, "provinceId": 2} +# {"createTime": 1603679491000, "orderId": 1603679492, "payAmount": 41108.36128417494, "payPlatform": 0, "provinceId": 0} +# {"createTime": 1603679492000, "orderId": 1603679493, "payAmount": 64882.44233197067, "payPlatform": 0, "provinceId": 4} +# {"createTime": 1603679521000, "orderId": 1603679522, "payAmount": 81648.80712644062, "payPlatform": 0, "provinceId": 3} +# {"createTime": 1603679522000, "orderId": 1603679523, "payAmount": 81861.73063103345, "payPlatform": 0, "provinceId": 4}' + +# function send_msg_to_kafka { +# while read line +# do +# send_messages_to_kafka "$line" "timer-stream-source" +# sleep 1 +# done <<< "$1" +# } + +# function read_msg_from_kafka { +# $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning \ +# --max-messages $1 \ +# --topic $2 \ +# --consumer-property group.id=$3 --timeout-ms 90000 2> /dev/null +# } + +# send_msg_to_kafka "${PAYMENT_MSGS[*]}" + +# JOB_ID=$(${FLINK_DIR}/bin/flink run \ +# -pyfs "${FLINK_PYTHON_TEST_DIR}/python/datastream" \ +# -pyreq "${REQUIREMENTS_PATH}" \ +# -pyarch "${TEST_DATA_DIR}/venv.zip" \ +# -pyexec "venv.zip/.conda/bin/python" \ +# -pym "data_stream_job" \ +# -j "${KAFKA_SQL_JAR}") + +# echo "${JOB_ID}" +# JOB_ID=`echo "${JOB_ID}" | sed 's/.* //g'` + +# wait_job_running ${JOB_ID} + +# echo "Reading kafka messages..." +# READ_MSG=$(read_msg_from_kafka 16 timer-stream-sink pyflink-e2e-test-timer) # We use env.execute_async() to submit the job, cancel it after fetched results. -cancel_job "${JOB_ID}" - -EXPECTED_MSG='Current key: 1603679414, orderId: 1603679414, payAmount: 83685.44904332698, timestamp: 1603679413000 -Current key: 1603679427, orderId: 1603679427, payAmount: 30092.50657757042, timestamp: 1603679426000 -Current key: 1603679428, orderId: 1603679428, payAmount: 62644.01719293056, timestamp: 1603679427000 -Current key: 1603679429, orderId: 1603679429, payAmount: 6449.806795118451, timestamp: 1603679428000 -Current key: 1603679492, orderId: 1603679492, payAmount: 41108.36128417494, timestamp: 1603679491000 -Current key: 1603679493, orderId: 1603679493, payAmount: 64882.44233197067, timestamp: 1603679492000 -Current key: 1603679522, orderId: 1603679522, payAmount: 81648.80712644062, timestamp: 1603679521000 -Current key: 1603679523, orderId: 1603679523, payAmount: 81861.73063103345, timestamp: 1603679522000 -On timer timestamp: -9223372036854774308 -On timer timestamp: -9223372036854774308 -On timer timestamp: -9223372036854774308 -On timer timestamp: -9223372036854774308 -On timer timestamp: -9223372036854774308 -On timer timestamp: -9223372036854774308 -On timer timestamp: -9223372036854774308 -On timer timestamp: -9223372036854774308' - -EXPECTED_MSG=$(sort_msg "${EXPECTED_MSG[*]}") -SORTED_READ_MSG=$(sort_msg "${READ_MSG[*]}") - -if [[ "${EXPECTED_MSG[*]}" != "${SORTED_READ_MSG[*]}" ]]; then - echo "Output from Flink program does not match expected output." - echo -e "EXPECTED Output: --${EXPECTED_MSG[*]}--" - echo -e "ACTUAL: --${SORTED_READ_MSG[*]}--" - exit 1 -fi +# cancel_job "${JOB_ID}" + +# EXPECTED_MSG='Current key: 1603679414, orderId: 1603679414, payAmount: 83685.44904332698, timestamp: 1603679413000 +# Current key: 1603679427, orderId: 1603679427, payAmount: 30092.50657757042, timestamp: 1603679426000 +# Current key: 1603679428, orderId: 1603679428, payAmount: 62644.01719293056, timestamp: 1603679427000 +# Current key: 1603679429, orderId: 1603679429, payAmount: 6449.806795118451, timestamp: 1603679428000 +# Current key: 1603679492, orderId: 1603679492, payAmount: 41108.36128417494, timestamp: 1603679491000 +# Current key: 1603679493, orderId: 1603679493, payAmount: 64882.44233197067, timestamp: 1603679492000 +# Current key: 1603679522, orderId: 1603679522, payAmount: 81648.80712644062, timestamp: 1603679521000 +# Current key: 1603679523, orderId: 1603679523, payAmount: 81861.73063103345, timestamp: 1603679522000 +# On timer timestamp: -9223372036854774308 +# On timer timestamp: -9223372036854774308 +# On timer timestamp: -9223372036854774308 +# On timer timestamp: -9223372036854774308 +# On timer timestamp: -9223372036854774308 +# On timer timestamp: -9223372036854774308 +# On timer timestamp: -9223372036854774308 +# On timer timestamp: -9223372036854774308' + +# EXPECTED_MSG=$(sort_msg "${EXPECTED_MSG[*]}") +# SORTED_READ_MSG=$(sort_msg "${READ_MSG[*]}") + +# if [[ "${EXPECTED_MSG[*]}" != "${SORTED_READ_MSG[*]}" ]]; then +# echo "Output from Flink program does not match expected output." +# echo -e "EXPECTED Output: --${EXPECTED_MSG[*]}--" +# echo -e "ACTUAL: --${SORTED_READ_MSG[*]}--" +# exit 1 +# fi # clean up python env "${FLINK_PYTHON_DIR}/dev/lint-python.sh" -r
