Hi Jigar,

in order to run the Sink function on the Flink cluster, it will be
serialized. Since you marked the repository as transient, it won't be
shipped to the cluster. So if Repository is Serializable, you can ship it
to the cluster. If not, then you need to reconstruct the Repository on the
cluster (e.g. on the first invoke call or the open call on the
RichSinkFunction).

Cheers,
Till

On Mon, Oct 11, 2021 at 10:12 AM Jigar Gajjar <jigargajjar2...@gmail.com>
wrote:

> Hello Devs,
>
>
> Here is my custom sink code.
>
> `````````````````````````
>
> public class FlinkNeptuneSink<IN> extends RichSinkFunction<IN> {
>     static HttpClient client = HttpClient.newHttpClient();
>     private static final long serialVersionUID = 1L;
>     NeptuneClientFactory neptuneClientFactory;
>     JsonLDWriteContext jsonLDWriteContext;
>     String baseURI;
>     Map contextJsonMap;
>     String namespaceURI;
>
>     public FlinkNeptuneSink(String protocol, String host, String port,
> String baseURI, Map contextJsonMap, String namespaceURI) {
>         neptuneClientFactory = new NeptuneClientFactory(protocol, host,
> port);
>
>         this.baseURI = baseURI;
>         this.contextJsonMap = contextJsonMap;
>         this.namespaceURI = namespaceURI;
>     }
>
>     @Override
>     public void invoke(IN value, Context context) throws IOException {
>     //neptuneClientFactory.getNeptuneClient()   (repository  attribute in
> neptuneClientFactory   is null)
>     try (RepositoryConnection conn =
> neptuneClientFactory.getNeptuneClient().getConnection())      {
>     }
>
>     }
> }
>
> public class NeptuneClientFactory implements Serializable {
>         private transient Repository repository;
>
>         public NeptuneClientFactory(String protocol, String host, String
> port) {
>             this.repository = createNeptuneClient(protocol, host, port);
>         }
>
>     public static Repository createNeptuneClient(String protocol, String
> host, String port) {
>         String sparqlEndpoint = String.format("%s://%s:%s/sparql",
> protocol, host, port);
>         Repository repo = new SPARQLRepository(sparqlEndpoint);
>         repo.init();
>         return repo;
>     }
>
>         public Repository getNeptuneClient() {
>             return repository;
>         }
> }
>
>
> filtredNonEmptyP5.addSink(new FlinkNeptuneSink<>("https", "neptunehost",
> "8182", "https://localhost/entity";, contextJsonMap, "
> https://localhost/namespaces/default";));
>
> `````````
>
> when  it invokes method then only neptuneClientFactory has a repository
> value as null. not sure why, it has other attributes values properly set.
>
> Is flink initializing sink attributes from somewhere else?
> When I debug  then while creating sink  it
> initializes neptuneClientFactory  properly but when it comes to invoke
> method then the repository is blank.
>
> Please help.
>
> --
> Thanks
> Jigar Gajjar
>

Reply via email to