[ https://issues.apache.org/jira/browse/FLINK-8489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chesnay Schepler updated FLINK-8489: ------------------------------------ Affects Version/s: 1.5.0 > Data is not emitted by second ElasticSearch connector > ----------------------------------------------------- > > Key: FLINK-8489 > URL: https://issues.apache.org/jira/browse/FLINK-8489 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector > Affects Versions: 1.4.0, 1.5.0 > Reporter: Fabian Hueske > Assignee: Chesnay Schepler > Priority: Critical > > A user reported [this > issue|https://lists.apache.org/thread.html/e91c71beb45d6df879effa16c52f2c71aa6ef1a54ef0a8ac4ccc72ee@%3Cuser.flink.apache.org%3E] > on the user@f.a.o mailing list. > *Setup:* > * A program with two pipelines that write to ElasticSearch. The pipelines > can be connected or completely separate. > * ElasticSearch 5.6.4, connector {{flink-connector-elasticsearch5_2.11}} > *Problem:* > Only one of the ES connectors correctly emits data. The other connector > writes a single record and then stops emitting data (or does not write any > data at all). The problem does not exist, if the second ES connector is > replaced by a different connector (for example Cassandra). > Below is a program to reproduce the issue: > {code:java} > public class ElasticSearchTest1 { > public static void main(String[] args) throws Exception { > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > // set elasticsearch connection details > Map<String, String> config = new HashMap<>(); > config.put("bulk.flush.max.actions", "1"); > config.put("cluster.name", "<cluster name>"); > List<InetSocketAddress> transports = new ArrayList<>(); > transports.add(new > InetSocketAddress(InetAddress.getByName("<host ip>"), 9300)); > > //Set properties for Kafka Streaming > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", "<host > ip>"+":9092"); > properties.setProperty("group.id", "testGroup"); > properties.setProperty("auto.offset.reset", "latest"); > > //Create consumer for log records > > FlinkKafkaConsumer011 inputConsumer1 = new > FlinkKafkaConsumer011<>("elastic_test1", new JSONDeserializationSchema(), > properties); > > DataStream<RecordOne> firstStream = env > .addSource(inputConsumer1) > .flatMap(new CreateRecordOne()); > > firstStream > .addSink(new ElasticsearchSink<RecordOne>(config, > transports, > new > ElasticSearchOutputRecord("elastic_test_index1","elastic_test_index1"))); > > FlinkKafkaConsumer011 inputConsumer2 = new > FlinkKafkaConsumer011<>("elastic_test2", new JSONDeserializationSchema(), > properties); > > DataStream<RecordTwo> secondStream = env > .addSource(inputConsumer2) > .flatMap(new CreateRecordTwo()); > > secondStream > .addSink(new ElasticsearchSink<RecordTwo>(config, > transports, > new > ElasticSearchOutputRecord2("elastic_test_index2","elastic_test_index2"))); > > env.execute("Elastic Search Test"); > } > } > public class ElasticSearchOutputRecord implements > ElasticsearchSinkFunction<RecordOne> { > String index; > String type; > // Initialize filter function > public ElasticSearchOutputRecord(String index, String type) { > this.index = index; > this.type = type; > } > // construct index request > @Override > public void process( > RecordOne record, > RuntimeContext ctx, > RequestIndexer indexer) { > // construct JSON document to index > Map<String, String> json = new HashMap<>(); > > json.put("item_one", record.item1); > json.put("item_two", record.item2); > > IndexRequest rqst = Requests.indexRequest() > .index(index) // index name > .type(type) // mapping name > .source(json); > indexer.add(rqst); > } > } > public class ElasticSearchOutputRecord2 implements > ElasticsearchSinkFunction<RecordTwo> { > String index; > String type; > // Initialize filter function > public ElasticSearchOutputRecord2(String index, String type) { > this.index = index; > this.type = type; > } > // construct index request > @Override > public void process( > RecordTwo record, > RuntimeContext ctx, > RequestIndexer indexer) { > // construct JSON document to index > Map<String, String> json = new HashMap<>(); > > json.put("item_three", record.item3); > json.put("item_four", record.item4); > > IndexRequest rqst = Requests.indexRequest() > .index(index) // index name > .type(type) // mapping name > .source(json); > indexer.add(rqst); > } > } > public class CreateRecordOne implements FlatMapFunction<ObjectNode,RecordOne> > { > > static final Logger log = > LoggerFactory.getLogger(CreateRecordOne.class); > > @Override > public void flatMap(ObjectNode value, Collector<RecordOne> out) throws > Exception { > try { > out.collect(new > RecordOne(value.get("item1").asText(),value.get("item2").asText())); > } > catch(Exception e) { > log.error("error while creating RecordOne", e); > } > } > } > public class CreateRecordTwo implements FlatMapFunction<ObjectNode,RecordTwo> > { > > static final Logger log = > LoggerFactory.getLogger(CreateRecordTwo.class); > > @Override > public void flatMap(ObjectNode value, Collector<RecordTwo> out) throws > Exception { > try { > out.collect(new > RecordTwo(value.get("item1").asText(),value.get("item2").asText())); > } > catch(Exception e) { > log.error("error while creating RecordTwo", e); > } > } > } > public class RecordOne { > > public String item1; > public String item2; > > public RecordOne() {}; > > public RecordOne ( > > String item1, > String item2 > > ) { > > this.item1 = item1; > this.item2 = item2; > > } > } > public class RecordTwo { > > public String item3; > public String item4; > > public RecordTwo() {}; > > public RecordTwo ( > String item3, > String item4 > > ) { > this.item3 = item3; > this.item4 = item4; > > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)