This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push:
new a4dd58545d5 Revert "[FLINK-34324][test] Makes all s3 related
operations being declared and called in a single location"
a4dd58545d5 is described below
commit a4dd58545d59b59089d9321a743d6c98a7c8e855
Author: Matthias Pohl <[email protected]>
AuthorDate: Tue Feb 6 17:32:22 2024 +0100
Revert "[FLINK-34324][test] Makes all s3 related operations being declared
and called in a single location"
This reverts commit a15515ebc0e4c59ea0642e745e942591c28b3a3c.
---
.../test-scripts/test_file_sink.sh | 111 ++++++++++-----------
1 file changed, 52 insertions(+), 59 deletions(-)
diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh
b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
index 5ed1fda2c68..711f74b6672 100755
--- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh
@@ -20,16 +20,53 @@
OUT_TYPE="${1:-local}" # other type: s3
SINK_TO_TEST="${2:-"StreamingFileSink"}"
+S3_PREFIX=temp/test_file_sink-$(uuidgen)
+OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX"
+S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/$S3_PREFIX"
source "$(dirname "$0")"/common.sh
-# OUTPUT_PATH is a local folder that can be used as a download folder for
remote data
-# the helper functions will access this folder
-RANDOM_PREFIX="temp/test_file_sink-$(uuidgen)"
-OUTPUT_PATH="$TEST_DATA_DIR/${RANDOM_PREFIX}"
+if [ "${OUT_TYPE}" == "s3" ]; then
+ source "$(dirname "$0")"/common_s3.sh
+else
+ echo "S3 environment is not loaded for non-s3 test runs (test run type:
$OUT_TYPE)."
+fi
+
+# randomly set up openSSL with dynamically/statically linked libraries
+OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo
"static"; fi)
+echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection
between 'dynamic' and 'static')"
+
+s3_setup hadoop
+set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}"
+set_config_key "metrics.fetcher.update-interval" "2000"
+# this test relies on global failovers
+set_config_key "jobmanager.execution.failover-strategy" "full"
+
mkdir -p $OUTPUT_PATH
-# JOB_OUTPUT_PATH is the location where the job writes its data to
-JOB_OUTPUT_PATH="${OUTPUT_PATH}"
+if [ "${OUT_TYPE}" == "local" ]; then
+ echo "Use local output"
+ JOB_OUTPUT_PATH=${OUTPUT_PATH}
+elif [ "${OUT_TYPE}" == "s3" ]; then
+ echo "Use s3 output"
+ JOB_OUTPUT_PATH=${S3_OUTPUT_PATH}
+ set_config_key "state.checkpoints.dir"
"s3://$IT_CASE_S3_BUCKET/$S3_PREFIX-chk"
+ mkdir -p "$OUTPUT_PATH-chk"
+else
+ echo "Unknown output type: ${OUT_TYPE}"
+ exit 1
+fi
+
+# make sure we delete the file at the end
+function out_cleanup {
+ s3_delete_by_full_path_prefix "$S3_PREFIX"
+ s3_delete_by_full_path_prefix "${S3_PREFIX}-chk"
+ rollback_openssl_lib
+}
+if [ "${OUT_TYPE}" == "s3" ]; then
+ on_exit out_cleanup
+fi
+
+TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar"
###################################
# Get all lines in part files and sort them numerically.
@@ -42,6 +79,9 @@ JOB_OUTPUT_PATH="${OUTPUT_PATH}"
# sorted content of part files
###################################
function get_complete_result {
+ if [ "${OUT_TYPE}" == "s3" ]; then
+ s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX"
"part-" true
+ fi
find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
}
@@ -49,67 +89,20 @@ function get_complete_result {
# Get total number of lines in part files.
#
# Globals:
-# OUTPUT_PATH
+# S3_PREFIX
# Arguments:
# None
# Returns:
# line number in part files
###################################
function get_total_number_of_valid_lines {
- get_complete_result | wc -l | tr -d '[:space:]'
+ if [ "${OUT_TYPE}" == "local" ]; then
+ get_complete_result | wc -l | tr -d '[:space:]'
+ elif [ "${OUT_TYPE}" == "s3" ]; then
+ s3_get_number_of_lines_by_prefix "${S3_PREFIX}" "part-"
+ fi
}
-if [ "${OUT_TYPE}" == "local" ]; then
- echo "[INFO] Test run in local environment: No S3 environment is not loaded."
-elif [ "${OUT_TYPE}" == "s3" ]; then
- # the s3 context requires additional
- source "$(dirname "$0")"/common_s3.sh
- s3_setup hadoop
-
- # overwrites JOB_OUTPUT_PATH to point to S3
- S3_DATA_PREFIX="${RANDOM_PREFIX}"
- S3_CHECKPOINT_PREFIX="${RANDOM_PREFIX}-chk"
- JOB_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/${S3_DATA_PREFIX}"
- set_config_key "state.checkpoints.dir"
"s3://$IT_CASE_S3_BUCKET/${S3_CHECKPOINT_PREFIX}"
-
- # overwrites implementation for local runs
- function get_complete_result {
- # copies the data from S3 to the local OUTPUT_PATH
- s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH"
"$FILE_SINK_TEST_TEMP_SUBFOLDER" "part-" true
-
- # and prints the sorted output
- find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort
-g
- }
-
- # overwrites implementation for local runs
- function get_total_number_of_valid_lines {
- s3_get_number_of_lines_by_prefix "${FILE_SINK_TEST_TEMP_SUBFOLDER}" "part-"
- }
-
- # make sure we delete the file at the end
- function out_cleanup {
- s3_delete_by_full_path_prefix "${S3_DATA_PREFIX}"
- s3_delete_by_full_path_prefix "${S3_CHECKPOINT_PREFIX}"
- rollback_openssl_lib
- }
-
- on_exit out_cleanup
-else
- echo "[ERROR] Unknown out type: ${OUT_TYPE}"
- exit 1
-fi
-
-# randomly set up openSSL with dynamically/statically linked libraries
-OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo
"static"; fi)
-echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection
between 'dynamic' and 'static')"
-
-set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}"
-set_config_key "metrics.fetcher.update-interval" "2000"
-# this test relies on global failovers
-set_config_key "jobmanager.execution.failover-strategy" "full"
-
-TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar"
-
###################################
# Waits until a number of values have been written within a timeout.
# If the timeout expires, exit with return code 1.