aljoscha commented on a change in pull request #9333: [FLINK-9782][e2e] Harden
bucketing sink e2e test.
URL: https://github.com/apache/flink/pull/9333#discussion_r311014106
##########
File path: flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
##########
@@ -20,72 +20,136 @@
source "$(dirname "$0")"/common.sh
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-bucketing-sink-test/target/BucketingSinkTestProgram.jar
+JOB_OUTPUT_DIR=${TEST_DATA_DIR}/out/result
+LOG_DIR=${FLINK_DIR}/log
+
+function get_total_number_of_valid_lines {
+ # this method assumes that pending files contain valid data.
+ # That is because close() cannot move files to FINAL state but moves them to
PENDING.
+ # Given this, the job of the test has bucket size = Long.MAX
+ find ${TEST_DATA_DIR}/out -type f \( -iname "*.pending" -or -iname
"*.in-progress" -or -iname "part-*" \) -exec cat {} + | sort -g | wc -l
+}
+
+function wait_for_complete_result {
+ local expected_number_of_values=$1
+ local polling_timeout=$2
+ local polling_interval=5
+ local seconds_elapsed=0
+
+ local number_of_values=0
+ local previous_number_of_values=-1
+
+ while [[ ${number_of_values} -lt ${expected_number_of_values} ]]; do
+ if [[ ${seconds_elapsed} -ge ${polling_timeout} ]]; then
+ echo "Did not produce expected number of values within
${polling_timeout}s"
+ exit 1
+ fi
+
+ truncate_files_with_valid_data
+
+ sleep ${polling_interval}
+ ((seconds_elapsed += ${polling_interval}))
+
+ number_of_values=$(get_total_number_of_valid_lines)
+ if [[ ${previous_number_of_values} -ne ${number_of_values} ]]; then
+ echo "Number of produced values
${number_of_values}/${expected_number_of_values}"
+ previous_number_of_values=${number_of_values}
+ fi
+ done
+}
+
+function truncate_files_with_valid_data() {
+ # get truncate information
+ # e.g. "xxx xxx DEBUG xxx.BucketingSink - Writing valid-length file for
xxx/out/result8/part-0-0 to specify valid length 74994"
+ LOG_LINES=$(grep -rnw $LOG_DIR -e 'Writing valid-length file')
+
+ # perform truncate on every line
+ echo "Truncating buckets"
+
+ while read -r LOG_LINE; do
+ PART=$(echo "$LOG_LINE" | awk '{ print $10 }' FS=" ")
+ LENGTH=$(echo "$LOG_LINE" | awk '{ print $15 }' FS=" ")
+
+ if [[ -z "${PART}" ]]; then
+ continue
+ fi
+ re='^[0-9]+$'
+ if ! [[ ${LENGTH} =~ $re ]]; then
+ continue
+ fi
+
+ echo "TRUNCATING $PART"
+
+ dd if=$PART of="$PART.truncated" bs=$LENGTH count=1 >& /dev/null
+ rm $PART
+ mv "$PART.truncated" $PART
+ done <<< "$LOG_LINES"
+}
+
+function bucketing_cleanup() {
+ stop_cluster
+ $FLINK_DIR/bin/taskmanager.sh stop-all
+}
+
+# Fix the necessary configuration parameters.
+
+set_conf_ssl
+set_config_key "heartbeat.timeout" "70000"
Review comment:
Double check why this value is as it is now.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services