Github user bowenli86 commented on the issue:
https://github.com/apache/flink/pull/4473
hmmm... I was deploying our Flink job with this change. The Flink job
failed to start, and log reports:
```
The implementation of the RichSinkFunction is not serializable. The object
probably contains or references non serializable fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1131)
```
of which the implementation of the RichSinkFunction is
FlinkKinesisProducer. The only field I add to FlinkKinesisProducer is
KinesisProducerConfiguration. So I made KinesisProducerConfiguration
`transient`, and ran the Flink job, the job still fails.
Thus I doubted if FlinkKinesisProducer is already not serializable
currently. To verify that, I created a test for the current
FlinkKinesisProducer in master which doesn't have my PR change. Unit test is:
```
@Test
public void testProducerSerializable() {
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
"accessKey");
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
testConfig.setProperty(AWSConfigConstants.AWS_REGION,
"us-east-1");
testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
KinesisConfigUtil.validateAwsConfiguration(testConfig);
FlinkKinesisProducer producer = new FlinkKinesisProducer(new
KinesisSerializationSchema() {
@Override
public ByteBuffer serialize(Object element) {
return null;
}
@Override
public String getTargetStream(Object element) {
return null;
}
}, testConfig);
ClosureCleaner.ensureSerializable(producer);
}
```
And it fails. Thus, I think something in FlinkKinesisProducer might not
serializable, and already breaks FlinkKinesisProducer.
@tzulitai @aljoscha Any insights on this?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---