tillrohrmann commented on a change in pull request #11006:
[FLINK-15868][kinesis] Resolve version conflict between jackson-core and
jackson-dataformat-cbor
URL: https://github.com/apache/flink/pull/11006#discussion_r374788499
##########
File path:
flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/SourceSinkDataTestKit.java
##########
@@ -66,28 +70,47 @@ public void cancel() {
}
}
- /**
- * A {@link ElasticsearchSinkFunction} that indexes each element it
receives to a specified Elasticsearch index.
- */
- public static class TestElasticsearchSinkFunction implements
ElasticsearchSinkFunction<Tuple2<Integer, String>> {
+ public static ElasticsearchSinkFunction<Tuple2<Integer, String>>
getCborSinkFunction(String index) {
+ return new TestElasticsearchSinkFunction(index,
XContentFactory::cborBuilder);
+ }
+
+ public static ElasticsearchSinkFunction<Tuple2<Integer, String>>
getJsonSinkFunction(String index) {
+ return new TestElasticsearchSinkFunction(index,
XContentFactory::jsonBuilder);
+ }
+
+ public static ElasticsearchSinkFunction<Tuple2<Integer, String>>
getSmileSinkFunction(String index) {
+ return new TestElasticsearchSinkFunction(index,
XContentFactory::smileBuilder);
+ }
+
+ public static ElasticsearchSinkFunction<Tuple2<Integer, String>>
getYamlSinkFunction(String index) {
+ return new TestElasticsearchSinkFunction(index,
XContentFactory::yamlBuilder);
+ }
+
+ private static class TestElasticsearchSinkFunction implements
ElasticsearchSinkFunction<Tuple2<Integer, String>> {
private static final long serialVersionUID = 1L;
private final String index;
+ private XContentBuilderProvider contentBuilderProvider;
/**
* Create the sink function, specifying a target Elasticsearch
index.
*
* @param index Name of the target Elasticsearch index.
*/
- public TestElasticsearchSinkFunction(String index) {
+ public TestElasticsearchSinkFunction(String index,
XContentBuilderProvider contentBuilderProvider) {
this.index = index;
+ this.contentBuilderProvider = contentBuilderProvider;
}
public IndexRequest createIndexRequest(Tuple2<Integer, String>
element) {
Map<String, Object> json = new HashMap<>();
json.put(DATA_FIELD_NAME, element.f1);
- return new IndexRequest(index, TYPE_NAME,
element.f0.toString()).source(json);
+ try {
+ return new IndexRequest(index, TYPE_NAME,
element.f0.toString()).source(contentBuilderProvider.getBuilder().map(json));
Review comment:
Ok I rename it into `document`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services