Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/4473
  
    hmmm... I was deploying our Flink job with this change. The Flink job 
failed to start, and log reports:
    
    ```
    The implementation of the RichSinkFunction is not serializable. The object 
probably contains or references non serializable fields.
        org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
        
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
        
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
        
org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1131)
    ```
    
    of which the implementation of the RichSinkFunction is 
FlinkKinesisProducer. The only field I add to FlinkKinesisProducer is 
KinesisProducerConfiguration. So I made KinesisProducerConfiguration 
`transient`, and ran the Flink job, the job still fails. 
    
    Thus I doubted if FlinkKinesisProducer is already not serializable 
currently. To verify that, I created a test for the current 
FlinkKinesisProducer in master which doesn't have my PR change. Unit test is:
    
    ```
    @Test
        public void testProducerSerializable() {
                Properties testConfig = new Properties();
                testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKey");
                
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
                testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
                
testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
                KinesisConfigUtil.validateAwsConfiguration(testConfig);
    
                FlinkKinesisProducer producer = new FlinkKinesisProducer(new 
KinesisSerializationSchema() {
                        @Override
                        public ByteBuffer serialize(Object element) {
                                return null;
                        }
    
                        @Override
                        public String getTargetStream(Object element) {
                                return null;
                        }
                }, testConfig);
    
                ClosureCleaner.ensureSerializable(producer);
        }
    ```
    
    And it fails. Thus, I think something in FlinkKinesisProducer might not 
serializable, and already breaks FlinkKinesisProducer. 
    
    @tzulitai @aljoscha  Any insights on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to