This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 25bef37  [FLINK-9782][e2e] Harden bucketing sink e2e test.
25bef37 is described below

commit 25bef379a21c1c41879baa028e5d8a3634476fc5
Author: Kostas Kloudas <[email protected]>
AuthorDate: Fri Jul 19 09:20:11 2019 +0200

    [FLINK-9782][e2e] Harden bucketing sink e2e test.
---
 .../streaming/tests/BucketingSinkTestProgram.java  |  19 ++-
 flink-end-to-end-tests/test-scripts/common.sh      |   6 +-
 .../test-scripts/test_streaming_bucketing.sh       | 128 +++++++++++++++------
 3 files changed, 113 insertions(+), 40 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-bucketing-sink-test/src/main/java/org/apache/flink/streaming/tests/BucketingSinkTestProgram.java
 
b/flink-end-to-end-tests/flink-bucketing-sink-test/src/main/java/org/apache/flink/streaming/tests/BucketingSinkTestProgram.java
index 26d7e91..a6484e5 100644
--- 
a/flink-end-to-end-tests/flink-bucketing-sink-test/src/main/java/org/apache/flink/streaming/tests/BucketingSinkTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-bucketing-sink-test/src/main/java/org/apache/flink/streaming/tests/BucketingSinkTestProgram.java
@@ -62,7 +62,7 @@ public class BucketingSinkTestProgram {
 
                StreamExecutionEnvironment sEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
                sEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-                               3,
+                               10,
                                Time.of(10, TimeUnit.SECONDS)
                        ));
                sEnv.enableCheckpointing(4000);
@@ -71,7 +71,11 @@ public class BucketingSinkTestProgram {
 
                // define bucketing sink to emit the result
                BucketingSink<Tuple4<Integer, Long, Integer, String>> sink = 
new BucketingSink<Tuple4<Integer, Long, Integer, String>>(outputPath)
-                       .setBucketer(new KeyBucketer());
+                               .setBucketer(new KeyBucketer())
+                               .setBatchSize(Long.MAX_VALUE)
+                               .setBatchRolloverInterval(Long.MAX_VALUE)
+                               .setInactiveBucketCheckInterval(Long.MAX_VALUE)
+                               .setInactiveBucketThreshold(Long.MAX_VALUE);
 
                // generate data, shuffle, perform stateful operation, sink
                sEnv.addSource(new Generator(10, idlenessMs, 60))
@@ -143,6 +147,7 @@ public class BucketingSinkTestProgram {
                private final int durationMs;
 
                private long ms = 0;
+               private volatile boolean canceled = false;
 
                public Generator(int numKeys, int idlenessMs, int 
durationSeconds) {
                        this.numKeys = numKeys;
@@ -152,7 +157,7 @@ public class BucketingSinkTestProgram {
 
                @Override
                public void run(SourceContext<Tuple3<Integer, Long, String>> 
ctx) throws Exception {
-                       while (ms < durationMs) {
+                       while (ms < durationMs && !canceled) {
                                synchronized (ctx.getCheckpointLock()) {
                                        for (int i = 0; i < numKeys; i++) {
                                                ctx.collect(Tuple3.of(i, ms, 
"Some payload..."));
@@ -161,10 +166,16 @@ public class BucketingSinkTestProgram {
                                }
                                Thread.sleep(idlenessMs);
                        }
+
+                       while (!canceled) {
+                               Thread.sleep(50);
+                       }
                }
 
                @Override
-               public void cancel() { }
+               public void cancel() {
+                       canceled = true;
+               }
 
                @Override
                public List<Long> snapshotState(long checkpointId, long 
timestamp) {
diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index 7ed17f0..0191a73 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -553,9 +553,9 @@ function kill_all {
 }
 
 function kill_random_taskmanager {
-  KILL_TM=$(jps | grep "TaskManager" | sort -R | head -n 1 | awk '{print $1}')
-  kill -9 "$KILL_TM"
-  echo "TaskManager $KILL_TM killed."
+  local pid=`jps | grep -E "TaskManagerRunner|TaskManager" | sort -R | head -n 
1 | cut -d " " -f 1 || true`
+  kill -9 "$pid"
+  echo "TaskManager $pid killed."
 }
 
 function setup_flink_slf4j_metric_reporter() {
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh 
b/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
index 2ecea72..4a95f09 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
@@ -20,72 +20,134 @@
 source "$(dirname "$0")"/common.sh
 
 
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-bucketing-sink-test/target/BucketingSinkTestProgram.jar
+JOB_OUTPUT_DIR=${TEST_DATA_DIR}/out/result
+LOG_DIR=${FLINK_DIR}/log
+
+function get_total_number_of_valid_lines {
+  # this method assumes that pending files contain valid data.
+  # That is because close() cannot move files to FINAL state but moves them to 
PENDING.
+  # Given this, the job of the test has bucket size = Long.MAX
+  find ${TEST_DATA_DIR}/out -type f \( -iname "*.pending" -or -iname 
"*.in-progress" -or -iname "part-*" \) -exec cat {} + | sort -g | wc -l
+}
+
+function wait_for_complete_result {
+    local expected_number_of_values=$1
+    local polling_timeout=$2
+    local polling_interval=5
+    local seconds_elapsed=0
+
+    local number_of_values=0
+    local previous_number_of_values=-1
+
+    while [[ ${number_of_values} -lt ${expected_number_of_values} ]]; do
+        if [[ ${seconds_elapsed} -ge ${polling_timeout} ]]; then
+            echo "Did not produce expected number of values within 
${polling_timeout}s"
+            exit 1
+        fi
+
+        truncate_files_with_valid_data
+
+        sleep ${polling_interval}
+        ((seconds_elapsed += ${polling_interval}))
+
+        number_of_values=$(get_total_number_of_valid_lines)
+        if [[ ${previous_number_of_values} -ne ${number_of_values} ]]; then
+            echo "Number of produced values 
${number_of_values}/${expected_number_of_values}"
+            previous_number_of_values=${number_of_values}
+        fi
+    done
+}
+
+function truncate_files_with_valid_data() {
+  # get truncate information
+  # e.g. "xxx xxx DEBUG xxx.BucketingSink  - Writing valid-length file for 
xxx/out/result8/part-0-0 to specify valid length 74994"
+  LOG_LINES=$(grep -rnw $LOG_DIR -e 'Writing valid-length file')
+
+  # perform truncate on every line
+  echo "Truncating buckets"
+
+  while read -r LOG_LINE; do
+    PART=$(echo "$LOG_LINE" | awk '{ print $10 }' FS=" ")
+    LENGTH=$(echo "$LOG_LINE" | awk '{ print $15 }' FS=" ")
+
+    if [[ -z "${PART}" ]]; then
+        continue
+    fi
+    re='^[0-9]+$'
+    if ! [[ ${LENGTH}  =~ $re ]]; then
+        continue
+    fi
+
+    dd if=$PART of="$PART.truncated" bs=$LENGTH count=1 >& /dev/null
+    rm $PART
+    mv "$PART.truncated" $PART
+  done <<< "$LOG_LINES"
+}
+
+function bucketing_cleanup() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+}
+
+# Fix the necessary configuration parameters.
+
+set_conf_ssl
+set_config_key "heartbeat.timeout" "20000"
 
 # enable DEBUG logging level for the BucketingSink to retrieve truncate length 
later
 echo "" >> $FLINK_DIR/conf/log4j.properties
 echo 
"log4j.logger.org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink=DEBUG"
 >> $FLINK_DIR/conf/log4j.properties
 
-set_conf_ssl
+# Start the experiment.
+
 start_cluster
 $FLINK_DIR/bin/taskmanager.sh start
 $FLINK_DIR/bin/taskmanager.sh start
 $FLINK_DIR/bin/taskmanager.sh start
 
-function bucketing_cleanup() {
+wait_for_number_of_running_tms 4
 
-  stop_cluster
-  $FLINK_DIR/bin/taskmanager.sh stop-all
-}
 on_exit bucketing_cleanup
 
-JOB_ID=$($FLINK_DIR/bin/flink run -d -p 4 $TEST_PROGRAM_JAR -outputPath 
$TEST_DATA_DIR/out/result \
+JOB_ID=$($FLINK_DIR/bin/flink run -d -p 4 $TEST_PROGRAM_JAR -outputPath 
$JOB_OUTPUT_DIR \
   | grep "Job has been submitted with JobID" | sed 's/.* //g')
 
 wait_job_running ${JOB_ID}
 
 wait_num_checkpoints "${JOB_ID}" 5
 
-echo "Killing TM"
-
-# kill task manager
+echo "Killing 1 TM"
 kill_random_taskmanager
+wait_for_number_of_running_tms 3
 
-echo "Starting TM"
-
-# start task manager again
+echo "Restarting 1 TM"
 $FLINK_DIR/bin/taskmanager.sh start
+wait_for_number_of_running_tms 4
 
-echo "Killing 2 TMs"
+sleep 10
 
-# kill two task managers again shortly after
+echo "Killing 2 TMs"
 kill_random_taskmanager
 kill_random_taskmanager
+wait_for_number_of_running_tms 2
 
-echo "Starting 2 TMs and waiting for successful completion"
-
-# start task manager again and let job finish
+echo "Starting 2 TMs"
 $FLINK_DIR/bin/taskmanager.sh start
 $FLINK_DIR/bin/taskmanager.sh start
+wait_for_number_of_running_tms 4
 
-# the job should complete in under 60s because half of the work has been 
checkpointed
-wait_job_terminal_state "${JOB_ID}" "FINISHED"
-
-# get truncate information
-# e.g. "xxx xxx DEBUG xxx.BucketingSink  - Writing valid-length file for 
xxx/out/result8/part-0-0 to specify valid length 74994"
-LOG_LINES=$(grep -rnw $FLINK_DIR/log -e 'Writing valid-length file')
+# This guarantees that the initializeState() is called
+# before we start counting valid lines.
+# In other case we risk of counting invalid lines as valid ones.
+wait_job_running ${JOB_ID}
 
-# perform truncate on every line
-echo "Truncating buckets"
-while read -r LOG_LINE; do
-  PART=$(echo "$LOG_LINE" | awk '{ print $10 }' FS=" ")
-  LENGTH=$(echo "$LOG_LINE" | awk '{ print $15 }' FS=" ")
+echo "Waiting until all values have been produced"
+wait_for_complete_result 60000 900
 
-  echo "Truncating $PART to $LENGTH"
+cancel_job $JOB_ID
+wait_job_terminal_state ${JOB_ID} "CANCELED"
 
-  dd if=$PART of="$PART.truncated" bs=$LENGTH count=1
-  rm $PART
-  mv "$PART.truncated" $PART
-done <<< "$LOG_LINES"
+echo "Job $JOB_ID was cancelled, time to verify"
 
 # get all lines in pending or part files
 find ${TEST_DATA_DIR}/out -type f \( -iname "*.pending" -or -iname "part-*" \) 
-exec cat {} + > ${TEST_DATA_DIR}/complete_result

Reply via email to