becketqin edited a comment on pull request #14239:
URL: https://github.com/apache/flink/pull/14239#issuecomment-735924235


   The patch migrates all the existing IT cases from `KafkaConsumerTestBase` 
for the new KafkaSource, except the `runEndOfStreamTest()` which is no longer 
applicable.
   
   In order to migrate the IT case, the following two commits has to be 
cherry-picked from FLINK-20379.
   - Add a `GenericDeserializationSchema` that suits the new Source API. The 
`GenericDeserializationSchema` is intended to take care of the deserialization 
from an arbitrary record type, which is usually a `byte[]` with additional 
information.
   - Let the KafkaRecordDeserializer implement `GenericDeserializationSchema`. 
Add bridge methods to the existing `KafkaDeserializationSchema` implementations.
   
   Other than that, the following two commits are made:
   03173a7 fixes a bug where NPE was thrown when the committed offset does not 
exist for a partition.
   e1fe5fd adjusts the behavior of `SpecifiedOffsetsInitializer` to keep it the 
same as the legacy `FlinkKafkaConsumer`.
   df413b4 Disable offsets auto commit by default.
   
   I have verified the patch with a modified `StateMachineExample` with the 
following steps:
   1. Start a Kafka cluster and create a topic of 32 partitions.
   2. Start a Flink standalone cluster with 3 TMs.
   3. Submit the `KafkaEventsGeneratorJob` to generate records.
   4. Submit the `StateMachineExample` to process the records.
     - The jobs runs without exception.
     - Checkpoint works fine.
     - The offsets are committed back to Flink correctly.
     - The WebUI shows the metrics correctly.
   5. Kill -9 the TM that runs the `StateMachineExample`.
     - The Task fails over correctly and resumes consumption from the last 
checkpointed offset.
   6. Cancel the `StateMachineExample` with a savepoint.
     - The job stops without an exception.
   7. Resume the `StateMachineExample` from the savepoint.
     - The job resumes correctly from the offsets when it was canceled.
     - NOTE: The following exception may be seen from the TM log. It is a known 
issue from Kafka because the same client-id was registered twice to the JMX. It 
is a known issue to Kafka, but I am not familiar with the mbeanserver enough to 
tell if Flink should expect this. Because a task should not have side effects 
left in the TM after it finishes.
   ```
   javax.management.InstanceAlreadyExistsException: 
kafka.consumer:type=app-info,id=myGroup-0
           at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) 
~[?:1.8.0_172]
           at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
 ~[?:1.8.0_172]
           at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
 ~[?:1.8.0_172]
           at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
 ~[?:1.8.0_172]
           at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
 ~[?:1.8.0_172]
           at 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) 
~[?:1.8.0_172]
           at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
 ~[blob_p-7527eaac8696e2e6d749b6e93038622aad2a03ee-2deed2871e614394bdf6e
   692044a2daf:1.12-SNAPSHOT]
           at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:814) 
~[blob_p-7527eaac8696e2e6d749b6e93038622aad2a03ee-2deed2871e614394bdf6e6920
   44a2daf:1.12-SNAPSHOT]
           at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) 
~[blob_p-7527eaac8696e2e6d749b6e93038622aad2a03ee-2deed2871e614394bdf6e6920
   44a2daf:1.12-SNAPSHOT]
           at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) 
~[blob_p-7527eaac8696e2e6d749b6e93038622aad2a03ee-2deed2871e614394bdf6e6920
   44a2daf:1.12-SNAPSHOT]
           at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.<init>(KafkaPartitionSplitReader.java:84)
 ~[blob_p-7527eaac8696e2e6d749b6e9303862
   2aad2a03ee-2deed2871e614394bdf6e692044a2daf:1.12-SNAPSHOT]
           at 
org.apache.flink.connector.kafka.source.KafkaSource.lambda$createReader$0(KafkaSource.java:121)
 ~[blob_p-7527eaac8696e2e6d749b6e93038622aad2a03ee-2deed287
   1e614394bdf6e692044a2daf:1.12-SNAPSHOT]
           at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.createSplitFetcher(SplitFetcherManager.java:129)
 ~[flink-table-blink_2.11-1.12-S
   NAPSHOT.jar:1.12-SNAPSHOT]
           at 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:59)
 ~[flink-table-blink_2.11-1.
   12-SNAPSHOT.jar:1.12-SNAPSHOT]
           at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.addSplits(SourceReaderBase.java:218)
 ~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSH
   OT]
           at 
org.apache.flink.streaming.api.operators.SourceOperator.open(SourceOperator.java:220)
 ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to