[
https://issues.apache.org/jira/browse/FLINK-9902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16550520#comment-16550520
]
ASF GitHub Bot commented on FLINK-9902:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/6376#discussion_r203973793
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java
---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Source for window checkpointing IT cases that can introduce artificial
failures.
+ */
+public class FailingSource extends RichSourceFunction<Tuple2<Long,
IntType>>
+ implements ListCheckpointed<Integer>, CheckpointListener {
+
+ /**
+ * Function to generate and emit the test events (and watermarks if
required).
+ */
+ @FunctionalInterface
+ public interface EventEmittingGenerator extends Serializable {
+ void emitEvent(SourceContext<Tuple2<Long, IntType>> ctx, int
eventSequenceNo);
+ }
+
+ private static final long INITIAL = Long.MIN_VALUE;
+ private static final long STATEFUL_CHECKPOINT_COMPLETED =
Long.MIN_VALUE;
+
+ @Nonnull
+ private final EventEmittingGenerator eventEmittingGenerator;
+ private final int expectedEmitCalls;
+ private final int failureAfterNumElements;
+ private final boolean usingProcessingTime;
+ private final AtomicLong checkpointStatus;
+
+ private int emitCallCount;
+ private volatile boolean running;
+
+ public FailingSource(
+ @Nonnull EventEmittingGenerator eventEmittingGenerator,
+ @Nonnegative int numberOfGeneratorInvocations) {
+ this(eventEmittingGenerator, numberOfGeneratorInvocations,
TimeCharacteristic.EventTime);
+ }
+
+ public FailingSource(
+ @Nonnull EventEmittingGenerator eventEmittingGenerator,
+ @Nonnegative int numberOfGeneratorInvocations,
+ @Nonnull TimeCharacteristic timeCharacteristic) {
+ this.eventEmittingGenerator = eventEmittingGenerator;
+ this.running = true;
+ this.emitCallCount = 0;
+ this.expectedEmitCalls = numberOfGeneratorInvocations;
+ this.failureAfterNumElements = numberOfGeneratorInvocations / 2;
+ this.checkpointStatus = new AtomicLong(INITIAL);
+ this.usingProcessingTime = timeCharacteristic ==
TimeCharacteristic.ProcessingTime;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ // non-parallel source
+ assertEquals(1,
getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws
Exception {
+
+ final RuntimeContext runtimeContext = getRuntimeContext();
+ // detect if this task is "the chosen one" and should fail (via
subtaskidx), if it did not fail before (via attempt)
+ final boolean failThisTask =
+ runtimeContext.getAttemptNumber() == 0 &&
runtimeContext.getIndexOfThisSubtask() == 0;
+
+ // we loop longer than we have elements, to permit delayed
checkpoints
+ // to still cause a failure
+ while (running) {
+
+ // the function failed before, or we are in the
elements before the failure
+ synchronized (ctx.getCheckpointLock()) {
+ eventEmittingGenerator.emitEvent(ctx,
emitCallCount++);
+ running &= (emitCallCount < expectedEmitCalls);
+ }
+
+ if (emitCallCount < failureAfterNumElements) {
+ Thread.sleep(1);
+ } else if (failThisTask && emitCallCount ==
failureAfterNumElements) {
+ // wait for a pending checkpoint that fulfills
our requirements if needed
+ while (checkpointStatus.get() !=
STATEFUL_CHECKPOINT_COMPLETED) {
+ Thread.sleep(1);
+ }
+ throw new Exception("Artificial Failure");
+ }
+ }
+
+ if (usingProcessingTime) {
+ while (true) {
--- End diff --
Maybe us `running` here?
> Improve and refactor window checkpointing IT cases
> --------------------------------------------------
>
> Key: FLINK-9902
> URL: https://issues.apache.org/jira/browse/FLINK-9902
> Project: Flink
> Issue Type: Test
> Components: State Backends, Checkpointing
> Affects Versions: 1.6.0
> Reporter: Stefan Richter
> Assignee: Stefan Richter
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.6.0, 1.7.0
>
>
> Windowing IT cases currently have a lot of duplicated code that could be
> unified and deduplicated. Furthermore, the test will also not fail on
> problems with timer snapshots because either there are no timers in the
> snapshot or all timers will still be re-inserted before they trigger. We can
> cover timers as well if we change this.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)