This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push:
new a15515ebc0e [FLINK-34324][test] Makes all s3 related operations being
declared and called in a single location
a15515ebc0e is described below
commit a15515ebc0e4c59ea0642e745e942591c28b3a3c
Author: Matthias Pohl <[email protected]>
AuthorDate: Wed Jan 31 15:02:24 2024 +0100
[FLINK-34324][test] Makes all s3 related operations being declared and
called in a single location
---
.../test-scripts/test_file_sink.sh | 111 +++++++++++----------
1 file changed, 59 insertions(+), 52 deletions(-)
diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh
b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
index 711f74b6672..5ed1fda2c68 100755
--- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
@@ -20,53 +20,16 @@
OUT_TYPE="${1:-local}" # other type: s3
SINK_TO_TEST="${2:-"StreamingFileSink"}"
-S3_PREFIX=temp/test_file_sink-$(uuidgen)
-OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX"
-S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/$S3_PREFIX"
source "$(dirname "$0")"/common.sh
-if [ "${OUT_TYPE}" == "s3" ]; then
- source "$(dirname "$0")"/common_s3.sh
-else
- echo "S3 environment is not loaded for non-s3 test runs (test run type:
$OUT_TYPE)."
-fi
-
-# randomly set up openSSL with dynamically/statically linked libraries
-OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo
"static"; fi)
-echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection
between 'dynamic' and 'static')"
-
-s3_setup hadoop
-set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}"
-set_config_key "metrics.fetcher.update-interval" "2000"
-# this test relies on global failovers
-set_config_key "jobmanager.execution.failover-strategy" "full"
-
+# OUTPUT_PATH is a local folder that can be used as a download folder for
remote data
+# the helper functions will access this folder
+RANDOM_PREFIX="temp/test_file_sink-$(uuidgen)"
+OUTPUT_PATH="$TEST_DATA_DIR/${RANDOM_PREFIX}"
mkdir -p $OUTPUT_PATH
-if [ "${OUT_TYPE}" == "local" ]; then
- echo "Use local output"
- JOB_OUTPUT_PATH=${OUTPUT_PATH}
-elif [ "${OUT_TYPE}" == "s3" ]; then
- echo "Use s3 output"
- JOB_OUTPUT_PATH=${S3_OUTPUT_PATH}
- set_config_key "state.checkpoints.dir"
"s3://$IT_CASE_S3_BUCKET/$S3_PREFIX-chk"
- mkdir -p "$OUTPUT_PATH-chk"
-else
- echo "Unknown output type: ${OUT_TYPE}"
- exit 1
-fi
-
-# make sure we delete the file at the end
-function out_cleanup {
- s3_delete_by_full_path_prefix "$S3_PREFIX"
- s3_delete_by_full_path_prefix "${S3_PREFIX}-chk"
- rollback_openssl_lib
-}
-if [ "${OUT_TYPE}" == "s3" ]; then
- on_exit out_cleanup
-fi
-
-TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar"
+# JOB_OUTPUT_PATH is the location where the job writes its data to
+JOB_OUTPUT_PATH="${OUTPUT_PATH}"
###################################
# Get all lines in part files and sort them numerically.
@@ -79,9 +42,6 @@
TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.
# sorted content of part files
###################################
function get_complete_result {
- if [ "${OUT_TYPE}" == "s3" ]; then
- s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX"
"part-" true
- fi
find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
}
@@ -89,20 +49,67 @@ function get_complete_result {
# Get total number of lines in part files.
#
# Globals:
-# S3_PREFIX
+# OUTPUT_PATH
# Arguments:
# None
# Returns:
# line number in part files
###################################
function get_total_number_of_valid_lines {
- if [ "${OUT_TYPE}" == "local" ]; then
- get_complete_result | wc -l | tr -d '[:space:]'
- elif [ "${OUT_TYPE}" == "s3" ]; then
- s3_get_number_of_lines_by_prefix "${S3_PREFIX}" "part-"
- fi
+ get_complete_result | wc -l | tr -d '[:space:]'
}
+if [ "${OUT_TYPE}" == "local" ]; then
+ echo "[INFO] Test run in local environment: No S3 environment is not loaded."
+elif [ "${OUT_TYPE}" == "s3" ]; then
+ # the s3 context requires additional
+ source "$(dirname "$0")"/common_s3.sh
+ s3_setup hadoop
+
+ # overwrites JOB_OUTPUT_PATH to point to S3
+ S3_DATA_PREFIX="${RANDOM_PREFIX}"
+ S3_CHECKPOINT_PREFIX="${RANDOM_PREFIX}-chk"
+ JOB_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/${S3_DATA_PREFIX}"
+ set_config_key "state.checkpoints.dir"
"s3://$IT_CASE_S3_BUCKET/${S3_CHECKPOINT_PREFIX}"
+
+ # overwrites implementation for local runs
+ function get_complete_result {
+ # copies the data from S3 to the local OUTPUT_PATH
+ s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH"
"$FILE_SINK_TEST_TEMP_SUBFOLDER" "part-" true
+
+ # and prints the sorted output
+ find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort
-g
+ }
+
+ # overwrites implementation for local runs
+ function get_total_number_of_valid_lines {
+ s3_get_number_of_lines_by_prefix "${FILE_SINK_TEST_TEMP_SUBFOLDER}" "part-"
+ }
+
+ # make sure we delete the file at the end
+ function out_cleanup {
+ s3_delete_by_full_path_prefix "${S3_DATA_PREFIX}"
+ s3_delete_by_full_path_prefix "${S3_CHECKPOINT_PREFIX}"
+ rollback_openssl_lib
+ }
+
+ on_exit out_cleanup
+else
+ echo "[ERROR] Unknown out type: ${OUT_TYPE}"
+ exit 1
+fi
+
+# randomly set up openSSL with dynamically/statically linked libraries
+OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo
"static"; fi)
+echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection
between 'dynamic' and 'static')"
+
+set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}"
+set_config_key "metrics.fetcher.update-interval" "2000"
+# this test relies on global failovers
+set_config_key "jobmanager.execution.failover-strategy" "full"
+
+TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar"
+
###################################
# Waits until a number of values have been written within a timeout.
# If the timeout expires, exit with return code 1.