[
https://issues.apache.org/jira/browse/FLINK-21032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17268345#comment-17268345
]
Leonard Xu commented on FLINK-21032:
------------------------------------
>From the detail log, the job has 3 compact-operator(streaming-writer) do the
>compaction work, but one of them get a checkpoint Exception and did not finish
>the checkpoint and did not produce the compaction files as expected.
{code:java}
11:36:47,761 [Source: Custom Source ->
SourceConversion(table=[default_catalog.default_database.my_table], fields=[a,
b, c]) -> Filter -> streaming-writer (2/3)#0] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 1
checkpointing for checkpoint with id=4 (max part counter=66).11:36:47,761
[Source: Custom Source ->
SourceConversion(table=[default_catalog.default_database.my_table], fields=[a,
b, c]) -> Filter -> streaming-writer (3/3)#0] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 2
checkpointing for checkpoint with id=4 (max part counter=66).11:36:47,762
[Source: Custom Source ->
SourceConversion(table=[default_catalog.default_database.my_table], fields=[a,
b, c]) -> Filter -> streaming-writer (1/3)#0] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0
checkpointing for checkpoint with id=4 (max part counter=68).11:36:47,970
[compact-operator (1/3)#0] INFO
org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - Compaction
time cost is '0.202S', target file is
'file:/tmp/junit8284797133733926241/junit3368655018742742833/compacted-part-10e63de4-6d59-4bfa-9292-c7741c360c96-0-34',
input files are
'[file:/tmp/junit8284797133733926241/junit3368655018742742833/.uncompacted-part-10e63de4-6d59-4bfa-9292-c7741c360c96-0-34...]
'11:36:47,977 [jobmanager-future-thread-26] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed
checkpoint 4 for job 7c53c3e6a6dd259c6cfa396242680c31 (17514 bytes in 218 ms).
11:36:47,982 [AsyncOperations-thread-1] INFO
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Source:
Custom Source ->
SourceConversion(table=[default_catalog.default_database.my_table], fields=[a,
b, c]) -> Filter -> streaming-writer (3/3)#0 - asynchronous part of checkpoint
5 could not be completed.java.util.concurrent.CancellationException: null at
java.util.concurrent.FutureTask.report(FutureTask.java:121) ~[?:1.8.0_275]
at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_275]
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:621)
~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:60)
~[flink-streaming-java_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:122)
[flink-streaming-java_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_275] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_275] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]{code}
And thus when read out from the compaction files, we found the data lost.
I think this test failed because the source operator(ParallelFiniteTestSource)
is still 3 parallelism rather than single parallelism. If the source is
multiple parallelism and the source will end in test(in real case, the source
won't stop), after one source subtask received endInput,the other source
subtask is still waiting checkpoint or running, this situation may lead this
problem.
> JsonFileCompactionITCase fails on azure
> ---------------------------------------
>
> Key: FLINK-21032
> URL: https://issues.apache.org/jira/browse/FLINK-21032
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Ecosystem
> Reporter: Dawid Wysakowicz
> Priority: Blocker
> Labels: test-stability
> Fix For: 1.13.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12230&view=logs&j=d44f43ce-542c-597d-bf94-b0718c71e5e8&t=03dca39c-73e8-5aaf-601d-328ae5c35f20
> {code}
> [ERROR] Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 11.39
> s <<< FAILURE! - in org.apache.flink.formats.json.JsonFileCompactionITCase
> [ERROR]
> testSingleParallelism(org.apache.flink.formats.json.JsonFileCompactionITCase)
> Time elapsed: 1.21 s <<< FAILURE!
> java.lang.AssertionError: expected:<[+I[0, 0, 0], +I[0, 0, 0], +I[1, 1, 1],
> +I[1, 1, 1], +I[2, 2, 2], +I[2, 2, 2], +I[3, 3, 3], +I[3, 3, 3], +I[4, 4, 4],
> +I[4, 4, 4], +I[5, 5, 5], +I[5, 5, 5], +I[6, 6, 6], +I[6, 6, 6], +I[7, 7, 7],
> +I[7, 7, 7], +I[8, 8, 8], +I[8, 8, 8], +I[9, 9, 9], +I[9, 9, 9], +I[10, 0,
> 0], +I[10, 0, 0], +I[11, 1, 1], +I[11, 1, 1], +I[12, 2, 2], +I[12, 2, 2],
> +I[13, 3, 3], +I[13, 3, 3], +I[14, 4, 4], +I[14, 4, 4], +I[15, 5, 5], +I[15,
> 5, 5], +I[16, 6, 6], +I[16, 6, 6], +I[17, 7, 7], +I[17, 7, 7], +I[18, 8, 8],
> +I[18, 8, 8], +I[19, 9, 9], +I[19, 9, 9], +I[20, 0, 0], +I[20, 0, 0], +I[21,
> 1, 1], +I[21, 1, 1], +I[22, 2, 2], +I[22, 2, 2], +I[23, 3, 3], +I[23, 3, 3],
> +I[24, 4, 4], +I[24, 4, 4], +I[25, 5, 5], +I[25, 5, 5], +I[26, 6, 6], +I[26,
> 6, 6], +I[27, 7, 7], +I[27, 7, 7], +I[28, 8, 8], +I[28, 8, 8], +I[29, 9, 9],
> +I[29, 9, 9], +I[30, 0, 0], +I[30, 0, 0], +I[31, 1, 1], +I[31, 1, 1], +I[32,
> 2, 2], +I[32, 2, 2], +I[33, 3, 3], +I[33, 3, 3], +I[34, 4, 4], +I[34, 4, 4],
> +I[35, 5, 5], +I[35, 5, 5], +I[36, 6, 6], +I[36, 6, 6], +I[37, 7, 7], +I[37,
> 7, 7], +I[38, 8, 8], +I[38, 8, 8], +I[39, 9, 9], +I[39, 9, 9], +I[40, 0, 0],
> +I[40, 0, 0], +I[41, 1, 1], +I[41, 1, 1], +I[42, 2, 2], +I[42, 2, 2], +I[43,
> 3, 3], +I[43, 3, 3], +I[44, 4, 4], +I[44, 4, 4], +I[45, 5, 5], +I[45, 5, 5],
> +I[46, 6, 6], +I[46, 6, 6], +I[47, 7, 7], +I[47, 7, 7], +I[48, 8, 8], +I[48,
> 8, 8], +I[49, 9, 9], +I[49, 9, 9], +I[50, 0, 0], +I[50, 0, 0], +I[51, 1, 1],
> +I[51, 1, 1], +I[52, 2, 2], +I[52, 2, 2], +I[53, 3, 3], +I[53, 3, 3], +I[54,
> 4, 4], +I[54, 4, 4], +I[55, 5, 5], +I[55, 5, 5], +I[56, 6, 6], +I[56, 6, 6],
> +I[57, 7, 7], +I[57, 7, 7], +I[58, 8, 8], +I[58, 8, 8], +I[59, 9, 9], +I[59,
> 9, 9], +I[60, 0, 0], +I[60, 0, 0], +I[61, 1, 1], +I[61, 1, 1], +I[62, 2, 2],
> +I[62, 2, 2], +I[63, 3, 3], +I[63, 3, 3], +I[64, 4, 4], +I[64, 4, 4], +I[65,
> 5, 5], +I[65, 5, 5], +I[66, 6, 6], +I[66, 6, 6], +I[67, 7, 7], +I[67, 7, 7],
> +I[68, 8, 8], +I[68, 8, 8], +I[69, 9, 9], +I[69, 9, 9], +I[70, 0, 0], +I[70,
> 0, 0], +I[71, 1, 1], +I[71, 1, 1], +I[72, 2, 2], +I[72, 2, 2], +I[73, 3, 3],
> +I[73, 3, 3], +I[74, 4, 4], +I[74, 4, 4], +I[75, 5, 5], +I[75, 5, 5], +I[76,
> 6, 6], +I[76, 6, 6], +I[77, 7, 7], +I[77, 7, 7], +I[78, 8, 8], +I[78, 8, 8],
> +I[79, 9, 9], +I[79, 9, 9], +I[80, 0, 0], +I[80, 0, 0], +I[81, 1, 1], +I[81,
> 1, 1], +I[82, 2, 2], +I[82, 2, 2], +I[83, 3, 3], +I[83, 3, 3], +I[84, 4, 4],
> +I[84, 4, 4], +I[85, 5, 5], +I[85, 5, 5], +I[86, 6, 6], +I[86, 6, 6], +I[87,
> 7, 7], +I[87, 7, 7], +I[88, 8, 8], +I[88, 8, 8], +I[89, 9, 9], +I[89, 9, 9],
> +I[90, 0, 0], +I[90, 0, 0], +I[91, 1, 1], +I[91, 1, 1], +I[92, 2, 2], +I[92,
> 2, 2], +I[93, 3, 3], +I[93, 3, 3], +I[94, 4, 4], +I[94, 4, 4], +I[95, 5, 5],
> +I[95, 5, 5], +I[96, 6, 6], +I[96, 6, 6], +I[97, 7, 7], +I[97, 7, 7], +I[98,
> 8, 8], +I[98, 8, 8], +I[99, 9, 9], +I[99, 9, 9]]> but was:<[+I[0, 0, 0],
> +I[0, 0, 0], +I[1, 1, 1], +I[1, 1, 1], +I[2, 2, 2], +I[3, 3, 3], +I[3, 3, 3],
> +I[4, 4, 4], +I[4, 4, 4], +I[5, 5, 5], +I[6, 6, 6], +I[6, 6, 6], +I[7, 7, 7],
> +I[7, 7, 7], +I[8, 8, 8], +I[9, 9, 9], +I[9, 9, 9], +I[10, 0, 0], +I[10, 0,
> 0], +I[11, 1, 1], +I[12, 2, 2], +I[12, 2, 2], +I[13, 3, 3], +I[13, 3, 3],
> +I[14, 4, 4], +I[15, 5, 5], +I[15, 5, 5], +I[16, 6, 6], +I[16, 6, 6], +I[17,
> 7, 7], +I[18, 8, 8], +I[18, 8, 8], +I[19, 9, 9], +I[19, 9, 9], +I[20, 0, 0],
> +I[21, 1, 1], +I[21, 1, 1], +I[22, 2, 2], +I[22, 2, 2], +I[23, 3, 3], +I[24,
> 4, 4], +I[24, 4, 4], +I[25, 5, 5], +I[25, 5, 5], +I[26, 6, 6], +I[27, 7, 7],
> +I[27, 7, 7], +I[28, 8, 8], +I[28, 8, 8], +I[29, 9, 9], +I[30, 0, 0], +I[30,
> 0, 0], +I[31, 1, 1], +I[31, 1, 1], +I[32, 2, 2], +I[33, 3, 3], +I[33, 3, 3],
> +I[34, 4, 4], +I[34, 4, 4], +I[35, 5, 5], +I[36, 6, 6], +I[36, 6, 6], +I[37,
> 7, 7], +I[37, 7, 7], +I[38, 8, 8], +I[39, 9, 9], +I[39, 9, 9], +I[40, 0, 0],
> +I[40, 0, 0], +I[41, 1, 1], +I[42, 2, 2], +I[42, 2, 2], +I[43, 3, 3], +I[43,
> 3, 3], +I[44, 4, 4], +I[45, 5, 5], +I[45, 5, 5], +I[46, 6, 6], +I[46, 6, 6],
> +I[47, 7, 7], +I[48, 8, 8], +I[48, 8, 8], +I[49, 9, 9], +I[49, 9, 9], +I[50,
> 0, 0], +I[51, 1, 1], +I[51, 1, 1], +I[52, 2, 2], +I[52, 2, 2], +I[53, 3, 3],
> +I[54, 4, 4], +I[54, 4, 4], +I[55, 5, 5], +I[55, 5, 5], +I[56, 6, 6], +I[57,
> 7, 7], +I[57, 7, 7], +I[58, 8, 8], +I[58, 8, 8], +I[59, 9, 9], +I[60, 0, 0],
> +I[60, 0, 0], +I[61, 1, 1], +I[61, 1, 1], +I[62, 2, 2], +I[63, 3, 3], +I[63,
> 3, 3], +I[64, 4, 4], +I[64, 4, 4], +I[65, 5, 5], +I[66, 6, 6], +I[66, 6, 6],
> +I[67, 7, 7], +I[67, 7, 7], +I[68, 8, 8], +I[69, 9, 9], +I[69, 9, 9], +I[70,
> 0, 0], +I[70, 0, 0], +I[71, 1, 1], +I[72, 2, 2], +I[72, 2, 2], +I[73, 3, 3],
> +I[73, 3, 3], +I[74, 4, 4], +I[75, 5, 5], +I[75, 5, 5], +I[76, 6, 6], +I[76,
> 6, 6], +I[77, 7, 7], +I[78, 8, 8], +I[78, 8, 8], +I[79, 9, 9], +I[79, 9, 9],
> +I[80, 0, 0], +I[81, 1, 1], +I[81, 1, 1], +I[82, 2, 2], +I[82, 2, 2], +I[83,
> 3, 3], +I[84, 4, 4], +I[84, 4, 4], +I[85, 5, 5], +I[85, 5, 5], +I[86, 6, 6],
> +I[87, 7, 7], +I[87, 7, 7], +I[88, 8, 8], +I[88, 8, 8], +I[89, 9, 9], +I[90,
> 0, 0], +I[90, 0, 0], +I[91, 1, 1], +I[91, 1, 1], +I[92, 2, 2], +I[93, 3, 3],
> +I[93, 3, 3], +I[94, 4, 4], +I[94, 4, 4], +I[95, 5, 5], +I[96, 6, 6], +I[96,
> 6, 6], +I[97, 7, 7], +I[97, 7, 7], +I[98, 8, 8], +I[99, 9, 9], +I[99, 9, 9]]>
> at org.junit.Assert.fail(Assert.java:88)
> at org.junit.Assert.failNotEquals(Assert.java:834)
> at org.junit.Assert.assertEquals(Assert.java:118)
> at org.junit.Assert.assertEquals(Assert.java:144)
> at
> org.apache.flink.table.planner.runtime.stream.sql.CompactionITCaseBase.assertIterator(CompactionITCaseBase.java:134)
> at
> org.apache.flink.table.planner.runtime.stream.sql.CompactionITCaseBase.innerTestNonPartition(CompactionITCaseBase.java:109)
> at
> org.apache.flink.table.planner.runtime.stream.sql.CompactionITCaseBase.testSingleParallelism(CompactionITCaseBase.java:96)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.lang.Thread.run(Thread.java:748)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)