Fabian Hueske created FLINK-8489:
------------------------------------

             Summary: Data is not emitted by second ElasticSearch connector
                 Key: FLINK-8489
                 URL: https://issues.apache.org/jira/browse/FLINK-8489
             Project: Flink
          Issue Type: Bug
          Components: ElasticSearch Connector
    Affects Versions: 1.4.0
            Reporter: Fabian Hueske


A user reported [this 
issue|https://lists.apache.org/thread.html/e91c71beb45d6df879effa16c52f2c71aa6ef1a54ef0a8ac4ccc72ee@%3Cuser.flink.apache.org%3E]
 on the [email protected] mailing list.

*Setup:*
 * A program with two pipelines that write to ElasticSearch. The pipelines can 
be connected or completely separate.
 * ElasticSearch 5.6.4, connector {{flink-connector-elasticsearch5_2.11}}

*Problem:*
 Only one of the ES connectors correctly emits data. The other connector writes 
a single record and then stops emitting data (or does not write any data at 
all). The problem does not exist, if the second ES connector is replaced by a 
different connector (for example Cassandra).

Below is a program to reproduce the issue:
{code:java}
public class ElasticSearchTest1 {

        public static void main(String[] args) throws Exception {
                
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
                // set elasticsearch connection details 
                Map<String, String> config = new HashMap<>();
                config.put("bulk.flush.max.actions", "1");
                config.put("cluster.name", "<cluster name>");
                List<InetSocketAddress> transports = new ArrayList<>();         
                transports.add(new 
InetSocketAddress(InetAddress.getByName("<host ip>"), 9300));
                
                //Set properties for Kafka Streaming
                Properties properties = new Properties();
                properties.setProperty("bootstrap.servers", "<host 
ip>"+":9092");
                properties.setProperty("group.id", "testGroup");
                properties.setProperty("auto.offset.reset", "latest");  
                                
                //Create consumer for log records
                
                FlinkKafkaConsumer011 inputConsumer1 = new 
FlinkKafkaConsumer011<>("elastic_test1", new JSONDeserializationSchema(), 
properties);
                                
                DataStream<RecordOne> firstStream = env
                                .addSource(inputConsumer1)
                                .flatMap(new CreateRecordOne());
                        
                firstStream             
                .addSink(new ElasticsearchSink<RecordOne>(config, 
                                transports, 
                                new 
ElasticSearchOutputRecord("elastic_test_index1","elastic_test_index1")));
                
                FlinkKafkaConsumer011 inputConsumer2 = new 
FlinkKafkaConsumer011<>("elastic_test2", new JSONDeserializationSchema(), 
properties);
                
                DataStream<RecordTwo> secondStream = env
                                        .addSource(inputConsumer2)              
                                        .flatMap(new CreateRecordTwo());
                        
                secondStream            
                .addSink(new ElasticsearchSink<RecordTwo>(config, 
                                transports, 
                                new 
ElasticSearchOutputRecord2("elastic_test_index2","elastic_test_index2")));
                                
                env.execute("Elastic Search Test");
        }
}

public class ElasticSearchOutputRecord implements 
ElasticsearchSinkFunction<RecordOne> {

        String index;
        String type;
    // Initialize filter function
    public ElasticSearchOutputRecord(String index, String type) {
        this.index = index;
        this.type = type;
    }
        // construct index request
        @Override
        public void process(
                        RecordOne record,
                RuntimeContext ctx,
                RequestIndexer indexer) {

                // construct JSON document to index
                Map<String, String> json = new HashMap<>();
                
                json.put("item_one", record.item1);      
                json.put("item_two", record.item2);      
                                                
                IndexRequest rqst = Requests.indexRequest()
                                .index(index)           // index name
                                .type(type)     // mapping name
                                .source(json);

                indexer.add(rqst);
        }
}

public class ElasticSearchOutputRecord2 implements 
ElasticsearchSinkFunction<RecordTwo> {

        String index;
        String type;
    // Initialize filter function
    public ElasticSearchOutputRecord2(String index, String type) {
        this.index = index;
        this.type = type;
    }
        // construct index request
        @Override
        public void process(
                        RecordTwo record,
                RuntimeContext ctx,
                RequestIndexer indexer) {

                // construct JSON document to index
                Map<String, String> json = new HashMap<>();
                
                json.put("item_three", record.item3);      
                json.put("item_four", record.item4);      
                                                
                IndexRequest rqst = Requests.indexRequest()
                                .index(index)           // index name
                                .type(type)     // mapping name
                                .source(json);

                indexer.add(rqst);
        }
}

public class CreateRecordOne implements FlatMapFunction<ObjectNode,RecordOne> {
        
        static final Logger log = 
LoggerFactory.getLogger(CreateRecordOne.class);
        
        @Override
        public void flatMap(ObjectNode value, Collector<RecordOne> out) throws 
Exception {
                try {
                        out.collect(new 
RecordOne(value.get("item1").asText(),value.get("item2").asText()));
                }
                catch(Exception e) {
                        log.error("error while creating RecordOne", e);
                }
        }

}

public class CreateRecordTwo implements FlatMapFunction<ObjectNode,RecordTwo> {
        
        static final Logger log = 
LoggerFactory.getLogger(CreateRecordTwo.class);
        
        @Override
        public void flatMap(ObjectNode value, Collector<RecordTwo> out) throws 
Exception {
                try {
                        out.collect(new 
RecordTwo(value.get("item1").asText(),value.get("item2").asText()));
                }
                catch(Exception e) {
                        log.error("error while creating RecordTwo", e);
                }
        }

}

public class RecordOne {        
        
        public String item1;    
        public String item2;    
                
        public RecordOne() {};
        
        public RecordOne (
                        
                        String item1,   
                        String item2    
                                                                        
                        ) {     
                
                                 this.item1 =   item1;
                                 this.item2 = item2;    
                                 
        }               
}

public class RecordTwo {        
        
        public String item3;    
        public String item4;    
                
        public RecordTwo() {};
        
        public RecordTwo (                      
                        String item3,   
                        String item4                                            
                        
                        ) {             
                                 this.item3 =   item3;
                                 this.item4 = item4;    
                                 
        }               
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to