[ 
https://issues.apache.org/jira/browse/FLINK-21032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17268345#comment-17268345
 ] 

Leonard Xu edited comment on FLINK-21032 at 1/20/21, 3:19 AM:
--------------------------------------------------------------

 

>From the detail log, the job has 3 compact-operator(streaming-writer) to do 
>the compaction work, but one of them get a checkpoint Exception and did not 
>finish the checkpoint and did not produce the compaction files as expected.
{code:java}
11:36:47,761 [Source: Custom Source -> 
SourceConversion(table=[default_catalog.default_database.my_table], fields=[a, 
b, c]) -> Filter -> streaming-writer (2/3)#0] INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 1 
checkpointing for checkpoint with id=4 (max part counter=66).11:36:47,761 
[Source: Custom Source -> 
SourceConversion(table=[default_catalog.default_database.my_table], fields=[a, 
b, c]) -> Filter -> streaming-writer (3/3)#0] INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 2 
checkpointing for checkpoint with id=4 (max part counter=66).11:36:47,762 
[Source: Custom Source -> 
SourceConversion(table=[default_catalog.default_database.my_table], fields=[a, 
b, c]) -> Filter -> streaming-writer (1/3)#0] INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
checkpointing for checkpoint with id=4 (max part counter=68).11:36:47,970 
[compact-operator (1/3)#0] INFO  
org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - Compaction 
time cost is '0.202S', target file is 
'file:/tmp/junit8284797133733926241/junit3368655018742742833/compacted-part-10e63de4-6d59-4bfa-9292-c7741c360c96-0-34',
 input files are 
'[file:/tmp/junit8284797133733926241/junit3368655018742742833/.uncompacted-part-10e63de4-6d59-4bfa-9292-c7741c360c96-0-34...]
'11:36:47,977 [jobmanager-future-thread-26] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed 
checkpoint 4 for job 7c53c3e6a6dd259c6cfa396242680c31 (17514 bytes in 218 ms).
11:36:47,982 [AsyncOperations-thread-1] INFO  
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Source: 
Custom Source -> 
SourceConversion(table=[default_catalog.default_database.my_table], fields=[a, 
b, c]) -> Filter -> streaming-writer (3/3)#0 - asynchronous part of checkpoint 
5 could not be completed.java.util.concurrent.CancellationException: null    at 
java.util.concurrent.FutureTask.report(FutureTask.java:121) ~[?:1.8.0_275]    
at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_275]    
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:621)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:60)
 ~[flink-streaming-java_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:122)
 [flink-streaming-java_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_275]    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_275]    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]{code}
 

And thus when read out from the compaction files, we found the data lost.

 I think this test failed because the source operator(ParallelFiniteTestSource) 
is still 3 parallelism rather than single parallelism. If the source is 
multiple parallelism and the source will end in test(in real case, the source 
won't stop),  after one source subtask received endInput,the other source 
subtask is still waiting checkpoint or running, this situation may lead this 
problem.

 

How do you think? [~dwysakowicz] [~lzljs3620320]


was (Author: leonard xu):
 

>From the detail log, the job has 3 compact-operator(streaming-writer) do the 
>compaction work, but one of them get a checkpoint Exception and did not finish 
>the checkpoint and did not produce the compaction files as expected.
{code:java}
11:36:47,761 [Source: Custom Source -> 
SourceConversion(table=[default_catalog.default_database.my_table], fields=[a, 
b, c]) -> Filter -> streaming-writer (2/3)#0] INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 1 
checkpointing for checkpoint with id=4 (max part counter=66).11:36:47,761 
[Source: Custom Source -> 
SourceConversion(table=[default_catalog.default_database.my_table], fields=[a, 
b, c]) -> Filter -> streaming-writer (3/3)#0] INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 2 
checkpointing for checkpoint with id=4 (max part counter=66).11:36:47,762 
[Source: Custom Source -> 
SourceConversion(table=[default_catalog.default_database.my_table], fields=[a, 
b, c]) -> Filter -> streaming-writer (1/3)#0] INFO  
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0 
checkpointing for checkpoint with id=4 (max part counter=68).11:36:47,970 
[compact-operator (1/3)#0] INFO  
org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - Compaction 
time cost is '0.202S', target file is 
'file:/tmp/junit8284797133733926241/junit3368655018742742833/compacted-part-10e63de4-6d59-4bfa-9292-c7741c360c96-0-34',
 input files are 
'[file:/tmp/junit8284797133733926241/junit3368655018742742833/.uncompacted-part-10e63de4-6d59-4bfa-9292-c7741c360c96-0-34...]
'11:36:47,977 [jobmanager-future-thread-26] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed 
checkpoint 4 for job 7c53c3e6a6dd259c6cfa396242680c31 (17514 bytes in 218 ms).
11:36:47,982 [AsyncOperations-thread-1] INFO  
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Source: 
Custom Source -> 
SourceConversion(table=[default_catalog.default_database.my_table], fields=[a, 
b, c]) -> Filter -> streaming-writer (3/3)#0 - asynchronous part of checkpoint 
5 could not be completed.java.util.concurrent.CancellationException: null    at 
java.util.concurrent.FutureTask.report(FutureTask.java:121) ~[?:1.8.0_275]    
at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_275]    
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:621)
 ~[flink-runtime_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:60)
 ~[flink-streaming-java_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:122)
 [flink-streaming-java_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_275]    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_275]    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]{code}
 

And thus when read out from the compaction files, we found the data lost.

 I think this test failed because the source operator(ParallelFiniteTestSource) 
is still 3 parallelism rather than single parallelism. If the source is 
multiple parallelism and the source will end in test(in real case, the source 
won't stop),  after one source subtask received endInput,the other source 
subtask is still waiting checkpoint or running, this situation may lead this 
problem.

> JsonFileCompactionITCase fails on azure
> ---------------------------------------
>
>                 Key: FLINK-21032
>                 URL: https://issues.apache.org/jira/browse/FLINK-21032
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Ecosystem
>            Reporter: Dawid Wysakowicz
>            Priority: Blocker
>              Labels: test-stability
>             Fix For: 1.13.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12230&view=logs&j=d44f43ce-542c-597d-bf94-b0718c71e5e8&t=03dca39c-73e8-5aaf-601d-328ae5c35f20
> {code}
> [ERROR] Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 11.39 
> s <<< FAILURE! - in org.apache.flink.formats.json.JsonFileCompactionITCase
> [ERROR] 
> testSingleParallelism(org.apache.flink.formats.json.JsonFileCompactionITCase) 
>  Time elapsed: 1.21 s  <<< FAILURE!
> java.lang.AssertionError: expected:<[+I[0, 0, 0], +I[0, 0, 0], +I[1, 1, 1], 
> +I[1, 1, 1], +I[2, 2, 2], +I[2, 2, 2], +I[3, 3, 3], +I[3, 3, 3], +I[4, 4, 4], 
> +I[4, 4, 4], +I[5, 5, 5], +I[5, 5, 5], +I[6, 6, 6], +I[6, 6, 6], +I[7, 7, 7], 
> +I[7, 7, 7], +I[8, 8, 8], +I[8, 8, 8], +I[9, 9, 9], +I[9, 9, 9], +I[10, 0, 
> 0], +I[10, 0, 0], +I[11, 1, 1], +I[11, 1, 1], +I[12, 2, 2], +I[12, 2, 2], 
> +I[13, 3, 3], +I[13, 3, 3], +I[14, 4, 4], +I[14, 4, 4], +I[15, 5, 5], +I[15, 
> 5, 5], +I[16, 6, 6], +I[16, 6, 6], +I[17, 7, 7], +I[17, 7, 7], +I[18, 8, 8], 
> +I[18, 8, 8], +I[19, 9, 9], +I[19, 9, 9], +I[20, 0, 0], +I[20, 0, 0], +I[21, 
> 1, 1], +I[21, 1, 1], +I[22, 2, 2], +I[22, 2, 2], +I[23, 3, 3], +I[23, 3, 3], 
> +I[24, 4, 4], +I[24, 4, 4], +I[25, 5, 5], +I[25, 5, 5], +I[26, 6, 6], +I[26, 
> 6, 6], +I[27, 7, 7], +I[27, 7, 7], +I[28, 8, 8], +I[28, 8, 8], +I[29, 9, 9], 
> +I[29, 9, 9], +I[30, 0, 0], +I[30, 0, 0], +I[31, 1, 1], +I[31, 1, 1], +I[32, 
> 2, 2], +I[32, 2, 2], +I[33, 3, 3], +I[33, 3, 3], +I[34, 4, 4], +I[34, 4, 4], 
> +I[35, 5, 5], +I[35, 5, 5], +I[36, 6, 6], +I[36, 6, 6], +I[37, 7, 7], +I[37, 
> 7, 7], +I[38, 8, 8], +I[38, 8, 8], +I[39, 9, 9], +I[39, 9, 9], +I[40, 0, 0], 
> +I[40, 0, 0], +I[41, 1, 1], +I[41, 1, 1], +I[42, 2, 2], +I[42, 2, 2], +I[43, 
> 3, 3], +I[43, 3, 3], +I[44, 4, 4], +I[44, 4, 4], +I[45, 5, 5], +I[45, 5, 5], 
> +I[46, 6, 6], +I[46, 6, 6], +I[47, 7, 7], +I[47, 7, 7], +I[48, 8, 8], +I[48, 
> 8, 8], +I[49, 9, 9], +I[49, 9, 9], +I[50, 0, 0], +I[50, 0, 0], +I[51, 1, 1], 
> +I[51, 1, 1], +I[52, 2, 2], +I[52, 2, 2], +I[53, 3, 3], +I[53, 3, 3], +I[54, 
> 4, 4], +I[54, 4, 4], +I[55, 5, 5], +I[55, 5, 5], +I[56, 6, 6], +I[56, 6, 6], 
> +I[57, 7, 7], +I[57, 7, 7], +I[58, 8, 8], +I[58, 8, 8], +I[59, 9, 9], +I[59, 
> 9, 9], +I[60, 0, 0], +I[60, 0, 0], +I[61, 1, 1], +I[61, 1, 1], +I[62, 2, 2], 
> +I[62, 2, 2], +I[63, 3, 3], +I[63, 3, 3], +I[64, 4, 4], +I[64, 4, 4], +I[65, 
> 5, 5], +I[65, 5, 5], +I[66, 6, 6], +I[66, 6, 6], +I[67, 7, 7], +I[67, 7, 7], 
> +I[68, 8, 8], +I[68, 8, 8], +I[69, 9, 9], +I[69, 9, 9], +I[70, 0, 0], +I[70, 
> 0, 0], +I[71, 1, 1], +I[71, 1, 1], +I[72, 2, 2], +I[72, 2, 2], +I[73, 3, 3], 
> +I[73, 3, 3], +I[74, 4, 4], +I[74, 4, 4], +I[75, 5, 5], +I[75, 5, 5], +I[76, 
> 6, 6], +I[76, 6, 6], +I[77, 7, 7], +I[77, 7, 7], +I[78, 8, 8], +I[78, 8, 8], 
> +I[79, 9, 9], +I[79, 9, 9], +I[80, 0, 0], +I[80, 0, 0], +I[81, 1, 1], +I[81, 
> 1, 1], +I[82, 2, 2], +I[82, 2, 2], +I[83, 3, 3], +I[83, 3, 3], +I[84, 4, 4], 
> +I[84, 4, 4], +I[85, 5, 5], +I[85, 5, 5], +I[86, 6, 6], +I[86, 6, 6], +I[87, 
> 7, 7], +I[87, 7, 7], +I[88, 8, 8], +I[88, 8, 8], +I[89, 9, 9], +I[89, 9, 9], 
> +I[90, 0, 0], +I[90, 0, 0], +I[91, 1, 1], +I[91, 1, 1], +I[92, 2, 2], +I[92, 
> 2, 2], +I[93, 3, 3], +I[93, 3, 3], +I[94, 4, 4], +I[94, 4, 4], +I[95, 5, 5], 
> +I[95, 5, 5], +I[96, 6, 6], +I[96, 6, 6], +I[97, 7, 7], +I[97, 7, 7], +I[98, 
> 8, 8], +I[98, 8, 8], +I[99, 9, 9], +I[99, 9, 9]]> but was:<[+I[0, 0, 0], 
> +I[0, 0, 0], +I[1, 1, 1], +I[1, 1, 1], +I[2, 2, 2], +I[3, 3, 3], +I[3, 3, 3], 
> +I[4, 4, 4], +I[4, 4, 4], +I[5, 5, 5], +I[6, 6, 6], +I[6, 6, 6], +I[7, 7, 7], 
> +I[7, 7, 7], +I[8, 8, 8], +I[9, 9, 9], +I[9, 9, 9], +I[10, 0, 0], +I[10, 0, 
> 0], +I[11, 1, 1], +I[12, 2, 2], +I[12, 2, 2], +I[13, 3, 3], +I[13, 3, 3], 
> +I[14, 4, 4], +I[15, 5, 5], +I[15, 5, 5], +I[16, 6, 6], +I[16, 6, 6], +I[17, 
> 7, 7], +I[18, 8, 8], +I[18, 8, 8], +I[19, 9, 9], +I[19, 9, 9], +I[20, 0, 0], 
> +I[21, 1, 1], +I[21, 1, 1], +I[22, 2, 2], +I[22, 2, 2], +I[23, 3, 3], +I[24, 
> 4, 4], +I[24, 4, 4], +I[25, 5, 5], +I[25, 5, 5], +I[26, 6, 6], +I[27, 7, 7], 
> +I[27, 7, 7], +I[28, 8, 8], +I[28, 8, 8], +I[29, 9, 9], +I[30, 0, 0], +I[30, 
> 0, 0], +I[31, 1, 1], +I[31, 1, 1], +I[32, 2, 2], +I[33, 3, 3], +I[33, 3, 3], 
> +I[34, 4, 4], +I[34, 4, 4], +I[35, 5, 5], +I[36, 6, 6], +I[36, 6, 6], +I[37, 
> 7, 7], +I[37, 7, 7], +I[38, 8, 8], +I[39, 9, 9], +I[39, 9, 9], +I[40, 0, 0], 
> +I[40, 0, 0], +I[41, 1, 1], +I[42, 2, 2], +I[42, 2, 2], +I[43, 3, 3], +I[43, 
> 3, 3], +I[44, 4, 4], +I[45, 5, 5], +I[45, 5, 5], +I[46, 6, 6], +I[46, 6, 6], 
> +I[47, 7, 7], +I[48, 8, 8], +I[48, 8, 8], +I[49, 9, 9], +I[49, 9, 9], +I[50, 
> 0, 0], +I[51, 1, 1], +I[51, 1, 1], +I[52, 2, 2], +I[52, 2, 2], +I[53, 3, 3], 
> +I[54, 4, 4], +I[54, 4, 4], +I[55, 5, 5], +I[55, 5, 5], +I[56, 6, 6], +I[57, 
> 7, 7], +I[57, 7, 7], +I[58, 8, 8], +I[58, 8, 8], +I[59, 9, 9], +I[60, 0, 0], 
> +I[60, 0, 0], +I[61, 1, 1], +I[61, 1, 1], +I[62, 2, 2], +I[63, 3, 3], +I[63, 
> 3, 3], +I[64, 4, 4], +I[64, 4, 4], +I[65, 5, 5], +I[66, 6, 6], +I[66, 6, 6], 
> +I[67, 7, 7], +I[67, 7, 7], +I[68, 8, 8], +I[69, 9, 9], +I[69, 9, 9], +I[70, 
> 0, 0], +I[70, 0, 0], +I[71, 1, 1], +I[72, 2, 2], +I[72, 2, 2], +I[73, 3, 3], 
> +I[73, 3, 3], +I[74, 4, 4], +I[75, 5, 5], +I[75, 5, 5], +I[76, 6, 6], +I[76, 
> 6, 6], +I[77, 7, 7], +I[78, 8, 8], +I[78, 8, 8], +I[79, 9, 9], +I[79, 9, 9], 
> +I[80, 0, 0], +I[81, 1, 1], +I[81, 1, 1], +I[82, 2, 2], +I[82, 2, 2], +I[83, 
> 3, 3], +I[84, 4, 4], +I[84, 4, 4], +I[85, 5, 5], +I[85, 5, 5], +I[86, 6, 6], 
> +I[87, 7, 7], +I[87, 7, 7], +I[88, 8, 8], +I[88, 8, 8], +I[89, 9, 9], +I[90, 
> 0, 0], +I[90, 0, 0], +I[91, 1, 1], +I[91, 1, 1], +I[92, 2, 2], +I[93, 3, 3], 
> +I[93, 3, 3], +I[94, 4, 4], +I[94, 4, 4], +I[95, 5, 5], +I[96, 6, 6], +I[96, 
> 6, 6], +I[97, 7, 7], +I[97, 7, 7], +I[98, 8, 8], +I[99, 9, 9], +I[99, 9, 9]]>
>       at org.junit.Assert.fail(Assert.java:88)
>       at org.junit.Assert.failNotEquals(Assert.java:834)
>       at org.junit.Assert.assertEquals(Assert.java:118)
>       at org.junit.Assert.assertEquals(Assert.java:144)
>       at 
> org.apache.flink.table.planner.runtime.stream.sql.CompactionITCaseBase.assertIterator(CompactionITCaseBase.java:134)
>       at 
> org.apache.flink.table.planner.runtime.stream.sql.CompactionITCaseBase.innerTestNonPartition(CompactionITCaseBase.java:109)
>       at 
> org.apache.flink.table.planner.runtime.stream.sql.CompactionITCaseBase.testSingleParallelism(CompactionITCaseBase.java:96)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>       at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>       at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
>       at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>       at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>       at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to