This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 25bef37 [FLINK-9782][e2e] Harden bucketing sink e2e test.
25bef37 is described below
commit 25bef379a21c1c41879baa028e5d8a3634476fc5
Author: Kostas Kloudas <[email protected]>
AuthorDate: Fri Jul 19 09:20:11 2019 +0200
[FLINK-9782][e2e] Harden bucketing sink e2e test.
---
.../streaming/tests/BucketingSinkTestProgram.java | 19 ++-
flink-end-to-end-tests/test-scripts/common.sh | 6 +-
.../test-scripts/test_streaming_bucketing.sh | 128 +++++++++++++++------
3 files changed, 113 insertions(+), 40 deletions(-)
diff --git
a/flink-end-to-end-tests/flink-bucketing-sink-test/src/main/java/org/apache/flink/streaming/tests/BucketingSinkTestProgram.java
b/flink-end-to-end-tests/flink-bucketing-sink-test/src/main/java/org/apache/flink/streaming/tests/BucketingSinkTestProgram.java
index 26d7e91..a6484e5 100644
---
a/flink-end-to-end-tests/flink-bucketing-sink-test/src/main/java/org/apache/flink/streaming/tests/BucketingSinkTestProgram.java
+++
b/flink-end-to-end-tests/flink-bucketing-sink-test/src/main/java/org/apache/flink/streaming/tests/BucketingSinkTestProgram.java
@@ -62,7 +62,7 @@ public class BucketingSinkTestProgram {
StreamExecutionEnvironment sEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
sEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(
- 3,
+ 10,
Time.of(10, TimeUnit.SECONDS)
));
sEnv.enableCheckpointing(4000);
@@ -71,7 +71,11 @@ public class BucketingSinkTestProgram {
// define bucketing sink to emit the result
BucketingSink<Tuple4<Integer, Long, Integer, String>> sink =
new BucketingSink<Tuple4<Integer, Long, Integer, String>>(outputPath)
- .setBucketer(new KeyBucketer());
+ .setBucketer(new KeyBucketer())
+ .setBatchSize(Long.MAX_VALUE)
+ .setBatchRolloverInterval(Long.MAX_VALUE)
+ .setInactiveBucketCheckInterval(Long.MAX_VALUE)
+ .setInactiveBucketThreshold(Long.MAX_VALUE);
// generate data, shuffle, perform stateful operation, sink
sEnv.addSource(new Generator(10, idlenessMs, 60))
@@ -143,6 +147,7 @@ public class BucketingSinkTestProgram {
private final int durationMs;
private long ms = 0;
+ private volatile boolean canceled = false;
public Generator(int numKeys, int idlenessMs, int
durationSeconds) {
this.numKeys = numKeys;
@@ -152,7 +157,7 @@ public class BucketingSinkTestProgram {
@Override
public void run(SourceContext<Tuple3<Integer, Long, String>>
ctx) throws Exception {
- while (ms < durationMs) {
+ while (ms < durationMs && !canceled) {
synchronized (ctx.getCheckpointLock()) {
for (int i = 0; i < numKeys; i++) {
ctx.collect(Tuple3.of(i, ms,
"Some payload..."));
@@ -161,10 +166,16 @@ public class BucketingSinkTestProgram {
}
Thread.sleep(idlenessMs);
}
+
+ while (!canceled) {
+ Thread.sleep(50);
+ }
}
@Override
- public void cancel() { }
+ public void cancel() {
+ canceled = true;
+ }
@Override
public List<Long> snapshotState(long checkpointId, long
timestamp) {
diff --git a/flink-end-to-end-tests/test-scripts/common.sh
b/flink-end-to-end-tests/test-scripts/common.sh
index 7ed17f0..0191a73 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -553,9 +553,9 @@ function kill_all {
}
function kill_random_taskmanager {
- KILL_TM=$(jps | grep "TaskManager" | sort -R | head -n 1 | awk '{print $1}')
- kill -9 "$KILL_TM"
- echo "TaskManager $KILL_TM killed."
+ local pid=`jps | grep -E "TaskManagerRunner|TaskManager" | sort -R | head -n
1 | cut -d " " -f 1 || true`
+ kill -9 "$pid"
+ echo "TaskManager $pid killed."
}
function setup_flink_slf4j_metric_reporter() {
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
b/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
index 2ecea72..4a95f09 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
@@ -20,72 +20,134 @@
source "$(dirname "$0")"/common.sh
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-bucketing-sink-test/target/BucketingSinkTestProgram.jar
+JOB_OUTPUT_DIR=${TEST_DATA_DIR}/out/result
+LOG_DIR=${FLINK_DIR}/log
+
+function get_total_number_of_valid_lines {
+ # this method assumes that pending files contain valid data.
+ # That is because close() cannot move files to FINAL state but moves them to
PENDING.
+ # Given this, the job of the test has bucket size = Long.MAX
+ find ${TEST_DATA_DIR}/out -type f \( -iname "*.pending" -or -iname
"*.in-progress" -or -iname "part-*" \) -exec cat {} + | sort -g | wc -l
+}
+
+function wait_for_complete_result {
+ local expected_number_of_values=$1
+ local polling_timeout=$2
+ local polling_interval=5
+ local seconds_elapsed=0
+
+ local number_of_values=0
+ local previous_number_of_values=-1
+
+ while [[ ${number_of_values} -lt ${expected_number_of_values} ]]; do
+ if [[ ${seconds_elapsed} -ge ${polling_timeout} ]]; then
+ echo "Did not produce expected number of values within
${polling_timeout}s"
+ exit 1
+ fi
+
+ truncate_files_with_valid_data
+
+ sleep ${polling_interval}
+ ((seconds_elapsed += ${polling_interval}))
+
+ number_of_values=$(get_total_number_of_valid_lines)
+ if [[ ${previous_number_of_values} -ne ${number_of_values} ]]; then
+ echo "Number of produced values
${number_of_values}/${expected_number_of_values}"
+ previous_number_of_values=${number_of_values}
+ fi
+ done
+}
+
+function truncate_files_with_valid_data() {
+ # get truncate information
+ # e.g. "xxx xxx DEBUG xxx.BucketingSink - Writing valid-length file for
xxx/out/result8/part-0-0 to specify valid length 74994"
+ LOG_LINES=$(grep -rnw $LOG_DIR -e 'Writing valid-length file')
+
+ # perform truncate on every line
+ echo "Truncating buckets"
+
+ while read -r LOG_LINE; do
+ PART=$(echo "$LOG_LINE" | awk '{ print $10 }' FS=" ")
+ LENGTH=$(echo "$LOG_LINE" | awk '{ print $15 }' FS=" ")
+
+ if [[ -z "${PART}" ]]; then
+ continue
+ fi
+ re='^[0-9]+$'
+ if ! [[ ${LENGTH} =~ $re ]]; then
+ continue
+ fi
+
+ dd if=$PART of="$PART.truncated" bs=$LENGTH count=1 >& /dev/null
+ rm $PART
+ mv "$PART.truncated" $PART
+ done <<< "$LOG_LINES"
+}
+
+function bucketing_cleanup() {
+ stop_cluster
+ $FLINK_DIR/bin/taskmanager.sh stop-all
+}
+
+# Fix the necessary configuration parameters.
+
+set_conf_ssl
+set_config_key "heartbeat.timeout" "20000"
# enable DEBUG logging level for the BucketingSink to retrieve truncate length
later
echo "" >> $FLINK_DIR/conf/log4j.properties
echo
"log4j.logger.org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink=DEBUG"
>> $FLINK_DIR/conf/log4j.properties
-set_conf_ssl
+# Start the experiment.
+
start_cluster
$FLINK_DIR/bin/taskmanager.sh start
$FLINK_DIR/bin/taskmanager.sh start
$FLINK_DIR/bin/taskmanager.sh start
-function bucketing_cleanup() {
+wait_for_number_of_running_tms 4
- stop_cluster
- $FLINK_DIR/bin/taskmanager.sh stop-all
-}
on_exit bucketing_cleanup
-JOB_ID=$($FLINK_DIR/bin/flink run -d -p 4 $TEST_PROGRAM_JAR -outputPath
$TEST_DATA_DIR/out/result \
+JOB_ID=$($FLINK_DIR/bin/flink run -d -p 4 $TEST_PROGRAM_JAR -outputPath
$JOB_OUTPUT_DIR \
| grep "Job has been submitted with JobID" | sed 's/.* //g')
wait_job_running ${JOB_ID}
wait_num_checkpoints "${JOB_ID}" 5
-echo "Killing TM"
-
-# kill task manager
+echo "Killing 1 TM"
kill_random_taskmanager
+wait_for_number_of_running_tms 3
-echo "Starting TM"
-
-# start task manager again
+echo "Restarting 1 TM"
$FLINK_DIR/bin/taskmanager.sh start
+wait_for_number_of_running_tms 4
-echo "Killing 2 TMs"
+sleep 10
-# kill two task managers again shortly after
+echo "Killing 2 TMs"
kill_random_taskmanager
kill_random_taskmanager
+wait_for_number_of_running_tms 2
-echo "Starting 2 TMs and waiting for successful completion"
-
-# start task manager again and let job finish
+echo "Starting 2 TMs"
$FLINK_DIR/bin/taskmanager.sh start
$FLINK_DIR/bin/taskmanager.sh start
+wait_for_number_of_running_tms 4
-# the job should complete in under 60s because half of the work has been
checkpointed
-wait_job_terminal_state "${JOB_ID}" "FINISHED"
-
-# get truncate information
-# e.g. "xxx xxx DEBUG xxx.BucketingSink - Writing valid-length file for
xxx/out/result8/part-0-0 to specify valid length 74994"
-LOG_LINES=$(grep -rnw $FLINK_DIR/log -e 'Writing valid-length file')
+# This guarantees that the initializeState() is called
+# before we start counting valid lines.
+# In other case we risk of counting invalid lines as valid ones.
+wait_job_running ${JOB_ID}
-# perform truncate on every line
-echo "Truncating buckets"
-while read -r LOG_LINE; do
- PART=$(echo "$LOG_LINE" | awk '{ print $10 }' FS=" ")
- LENGTH=$(echo "$LOG_LINE" | awk '{ print $15 }' FS=" ")
+echo "Waiting until all values have been produced"
+wait_for_complete_result 60000 900
- echo "Truncating $PART to $LENGTH"
+cancel_job $JOB_ID
+wait_job_terminal_state ${JOB_ID} "CANCELED"
- dd if=$PART of="$PART.truncated" bs=$LENGTH count=1
- rm $PART
- mv "$PART.truncated" $PART
-done <<< "$LOG_LINES"
+echo "Job $JOB_ID was cancelled, time to verify"
# get all lines in pending or part files
find ${TEST_DATA_DIR}/out -type f \( -iname "*.pending" -or -iname "part-*" \)
-exec cat {} + > ${TEST_DATA_DIR}/complete_result