If you make your bulk final, I think this could work: private final BulkProcessor bulk;
CrmApp() { Client esClient = new TransportClient( ImmutableSettings.builder().put("cluster.name", "devoxx") ).addTransportAddress( new InetSocketTransportAddress("127.0.0.1", 9300) ); bulk = BulkProcessor.builder(esClient, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { logger.debug("[{}] going to execute {} requests", executionId, request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { logger.debug("[{}] ok", executionId); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.warn("We have a problem", failure); bulk.add(request); } }) .setBulkActions(pageSize) .setFlushInterval(TimeValue.timeValueSeconds(5)) .build(); } -- David Pilato - Developer | Evangelist elastic.co @dadoonet <https://twitter.com/dadoonet> | @elasticsearchfr <https://twitter.com/elasticsearchfr> | @scrutmydocs <https://twitter.com/scrutmydocs> > Le 24 avr. 2015 à 10:59, mzrth_7810 <afrazmam...@gmail.com> a écrit : > > Hey guys, > > I'm using the BulkProcessor to index documents in elasticsearch. Its > definitely made my indexing throughput greater than it was before. > > Anyway, I was wondering if there were some best practices around exception > handling with the bulk processor. For example it would be good to schedule > retries in certain scenarios. > > At the moment all I'm doing is logging. I was wondering if someone could > point me to a resource with an example of handling a > NodeNotConnectedException and doing a retry. I don’t know how to access the > contents of the bulkProcessor from within the afterBulk method in the > Listener. > > > > public void beforeBulk(long executionId, BulkRequest bulkRequest) > { > } > > @Override > public void afterBulk(long executionId, BulkRequest bulkRequest, > BulkResponse bulkResponse) { > if (bulkResponse.hasFailures()) { > Log.error("We have failures"); > for (BulkItemResponse bulkItemResponse : > bulkResponse.getItems()) { > if (bulkItemResponse.isFailed()) { > Log.error(bulkItemResponse.getId() + " failed > with message: " + bulkItemResponse.getFailureMessage()); > } > } > } > } > > @Override > public void afterBulk(long executionId, BulkRequest bulkRequest, > Throwable t) { > Log.error("An exception occurred while indexing", t); > > // How do I add this back to the list of requests? > > } > > -- > You received this message because you are subscribed to the Google Groups > "elasticsearch" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to elasticsearch+unsubscr...@googlegroups.com > <mailto:elasticsearch+unsubscr...@googlegroups.com>. > To view this discussion on the web visit > https://groups.google.com/d/msgid/elasticsearch/7e7d476d-6a8d-4a82-bbd0-e331a08d1bb4%40googlegroups.com > > <https://groups.google.com/d/msgid/elasticsearch/7e7d476d-6a8d-4a82-bbd0-e331a08d1bb4%40googlegroups.com?utm_medium=email&utm_source=footer>. > For more options, visit https://groups.google.com/d/optout > <https://groups.google.com/d/optout>. -- You received this message because you are subscribed to the Google Groups "elasticsearch" group. To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/4ABE0924-A01E-421F-A5A6-D1A34A0EC236%40pilato.fr. For more options, visit https://groups.google.com/d/optout.