pnowojski commented on a change in pull request #15055:
URL: https://github.com/apache/flink/pull/15055#discussion_r661973155
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1729,43 +1731,117 @@ protected void cancelTask() {
}
@Test
- public void testTriggeringCheckpointWithFinishedChannels() throws
Exception {
- AtomicReference<Future<?>> lastCheckpointTriggerFuture = new
AtomicReference<>();
+ public void testNotWaitingForAllRecordsProcessedIfCheckpointNotEnabled()
throws Exception {
+ ResultPartitionWriter[] partitionWriters = new
ResultPartitionWriter[2];
+ try {
+ for (int i = 0; i < partitionWriters.length; ++i) {
+ partitionWriters[i] =
+
PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED_BOUNDED);
+ partitionWriters[i].setup();
+ }
- try (StreamTaskMailboxTestHarness<String> testHarness =
- new StreamTaskMailboxTestHarnessBuilder<>(
- env ->
- new HoldingOnAfterInvokeStreamTask(
- env,
lastCheckpointTriggerFuture),
- BasicTypeInfo.STRING_TYPE_INFO)
- .addInput(BasicTypeInfo.STRING_TYPE_INFO, 3)
- .setupOutputForSingletonOperatorChain(new
EmptyOperator())
- .build()) {
- // Tests triggering checkpoint when all the inputs are alive.
- Future<Boolean> checkpointFuture = triggerCheckpoint(testHarness,
2);
- processMailTillCheckpointSucceeds(testHarness, checkpointFuture);
- assertEquals(2,
testHarness.getTaskStateManager().getReportedCheckpointId());
+ try (StreamTaskMailboxTestHarness<String> testHarness =
+ new StreamTaskMailboxTestHarnessBuilder<>(
+ OneInputStreamTask::new, STRING_TYPE_INFO)
+ .modifyStreamConfig(config ->
config.setCheckpointingEnabled(false))
+ .addInput(STRING_TYPE_INFO)
+ .setupOperatorChain(new EmptyOperator())
+ .setNumberOfNonChainedOutputs(1 +
partitionWriters.length)
+
.finishForSingletonOperatorChain(StringSerializer.INSTANCE)
+ .addAdditionalOutput(partitionWriters)
Review comment:
Can we avoid having separate method `setNumberOfNonChainedOutputs()`
additional to `addAdditionalOutput()`? Can not we have only
`addAdditionalOutput()`?
Maybe we could move `addAdditionalOutput()` to the `StreamConfigChainer`? (I
see this might be problematic because `StreamConfigChainer` doesn't know the
type of the `OWNER` so it wouldn't be able to pass additional outputs to the
owner.
Or maybe we can enforce order of setting `addAdditionalOutput()` before
creating `StreamConfigChainer` in
`StreamTaskMailboxTestHarnessBuilder#setupOperatorChain(org.apache.flink.runtime.jobgraph.OperatorID,
org.apache.flink.streaming.api.operators.StreamOperatorFactory<?>)` and move
`setNumberOfNonChainedOutputs` to the `StreamConfigChainer`'s constructor? And
we could add `checkState` in `addAdditionalOutput()` that it can only be called
before `setupOperatorChain()`? This would avoid potential problems that
`setNumberOfNonChainedOutputs` is out of sync with respect to
`addAdditionalOutput`.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTracker.java
##########
@@ -212,14 +233,57 @@ public void
processCancellationBarrier(CancelCheckpointMarker cancelBarrier)
}
@Override
- public void processEndOfPartition() throws IOException {
- while (!pendingCheckpoints.isEmpty()) {
- CheckpointBarrierCount barrierCount =
pendingCheckpoints.removeFirst();
- if (barrierCount.markAborted()) {
- notifyAbort(
- barrierCount.checkpointId(),
- new CheckpointException(
-
CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
+ public void processEndOfPartition(InputChannelInfo channelInfo) throws
IOException {
Review comment:
nit: isn't this method too long? Shouldn't it be split?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
##########
@@ -205,6 +205,12 @@ public void resumeConsumption() {
throw new UnsupportedOperationException("RecoveredInputChannel should
never be blocked.");
}
+ @Override
+ public void acknowledgeAllRecordsProcessed() throws IOException {
+ throw new UnsupportedOperationException(
+ "RecoveredInputChannel should not need acknowledge all records
processed.");
+ }
+
Review comment:
Is this the right thing to do? Or will it make unaligned checkpoints
unusable with final checkpoint? What will happen if `EndOfUserRecordsEvent` has
been emitted and sent downstream, but hasn't yet been processed, before
unaligned checkpoint has completed? In that case we would recover
`EndOfUserRecordsEvent` via `RecoveredInputChannel` and we would end up here,
wouldn't we?
Shouldn't we forward this message to
`RemoteInputChannel`/`LocalInputChannel` after converting
`RecoveredInputChannel` into one of those?
##########
File path:
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
##########
@@ -106,7 +106,8 @@ protected void processInput(MailboxDefaultAction.Controller
controller) throws E
mainOperator.processElement(streamRecord);
} else {
mainOperator.endInput();
- controller.allActionsCompleted();
+ controller.suspendDefaultAction();
Review comment:
is it necessary to suspend the default action? Or is it just a
precaution?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]