Hi Jigar, in order to run the Sink function on the Flink cluster, it will be serialized. Since you marked the repository as transient, it won't be shipped to the cluster. So if Repository is Serializable, you can ship it to the cluster. If not, then you need to reconstruct the Repository on the cluster (e.g. on the first invoke call or the open call on the RichSinkFunction).
Cheers, Till On Mon, Oct 11, 2021 at 10:12 AM Jigar Gajjar <jigargajjar2...@gmail.com> wrote: > Hello Devs, > > > Here is my custom sink code. > > ````````````````````````` > > public class FlinkNeptuneSink<IN> extends RichSinkFunction<IN> { > static HttpClient client = HttpClient.newHttpClient(); > private static final long serialVersionUID = 1L; > NeptuneClientFactory neptuneClientFactory; > JsonLDWriteContext jsonLDWriteContext; > String baseURI; > Map contextJsonMap; > String namespaceURI; > > public FlinkNeptuneSink(String protocol, String host, String port, > String baseURI, Map contextJsonMap, String namespaceURI) { > neptuneClientFactory = new NeptuneClientFactory(protocol, host, > port); > > this.baseURI = baseURI; > this.contextJsonMap = contextJsonMap; > this.namespaceURI = namespaceURI; > } > > @Override > public void invoke(IN value, Context context) throws IOException { > //neptuneClientFactory.getNeptuneClient() (repository attribute in > neptuneClientFactory is null) > try (RepositoryConnection conn = > neptuneClientFactory.getNeptuneClient().getConnection()) { > } > > } > } > > public class NeptuneClientFactory implements Serializable { > private transient Repository repository; > > public NeptuneClientFactory(String protocol, String host, String > port) { > this.repository = createNeptuneClient(protocol, host, port); > } > > public static Repository createNeptuneClient(String protocol, String > host, String port) { > String sparqlEndpoint = String.format("%s://%s:%s/sparql", > protocol, host, port); > Repository repo = new SPARQLRepository(sparqlEndpoint); > repo.init(); > return repo; > } > > public Repository getNeptuneClient() { > return repository; > } > } > > > filtredNonEmptyP5.addSink(new FlinkNeptuneSink<>("https", "neptunehost", > "8182", "https://localhost/entity", contextJsonMap, " > https://localhost/namespaces/default")); > > ````````` > > when it invokes method then only neptuneClientFactory has a repository > value as null. not sure why, it has other attributes values properly set. > > Is flink initializing sink attributes from somewhere else? > When I debug then while creating sink it > initializes neptuneClientFactory properly but when it comes to invoke > method then the repository is blank. > > Please help. > > -- > Thanks > Jigar Gajjar >