This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2738307  [FLINK-18585][elasticsearch] Fix dynamic index doesn't work 
in new elasticsearch table sink
2738307 is described below

commit 273830730c9524278576255c56c17c195f7c6bf9
Author: Leonard Xu <[email protected]>
AuthorDate: Tue Jul 14 10:45:02 2020 +0800

    [FLINK-18585][elasticsearch] Fix dynamic index doesn't work in new 
elasticsearch table sink
    
    This closes #12886
---
 .../elasticsearch/table/IndexGeneratorFactory.java |  2 +-
 .../table/RowElasticsearchSinkFunction.java        |  5 +++
 .../table/Elasticsearch6DynamicSinkITCase.java     | 37 ++++++++++++++++++++++
 .../table/Elasticsearch7DynamicSinkITCase.java     | 37 ++++++++++++++++++++++
 4 files changed, 80 insertions(+), 1 deletion(-)

diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
index e60be72..692b1fe 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
@@ -55,7 +55,7 @@ import java.util.regex.Pattern;
  * convert a field value of TIMESTAMP/DATE/TIME type into the format specified 
by date_format_string. The
  * date_format_string is compatible with {@link java.text.SimpleDateFormat}. 
For example, if the option
  * value is 'myusers_{log_ts|yyyy-MM-dd}', then a record with log_ts field 
value 2020-03-27 12:25:55 will
- * be written into "myusers-2020-03-27" index.
+ * be written into "myusers_2020-03-27" index.
  */
 @Internal
 final class IndexGeneratorFactory {
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java
index 4eaba48..76701d8 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -69,6 +69,11 @@ class RowElasticsearchSinkFunction implements 
ElasticsearchSinkFunction<RowData>
        }
 
        @Override
+       public void open() {
+               indexGenerator.open();
+       }
+
+       @Override
        public void process(
                        RowData element,
                        RuntimeContext ctx,
diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
index 26cf90a..b306b34 100644
--- 
a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
@@ -247,6 +247,43 @@ public class Elasticsearch6DynamicSinkITCase {
                assertThat(result, equalTo(expectedMap));
        }
 
+       @Test
+       public void testWritingDocumentsWithDynamicIndex() throws Exception {
+               TableEnvironment tableEnvironment = 
TableEnvironment.create(EnvironmentSettings.newInstance()
+                       .useBlinkPlanner()
+                       .inStreamingMode()
+                       .build());
+
+               String index = "dynamic-index-{b|yyyy-MM-dd}";
+               String myType = "MyType";
+               tableEnvironment.executeSql("CREATE TABLE esTable (" +
+                       "a BIGINT NOT NULL,\n" +
+                       "b TIMESTAMP NOT NULL,\n" +
+                       "PRIMARY KEY (a) NOT ENFORCED\n" +
+                       ")\n" +
+                       "WITH (\n" +
+                       String.format("'%s'='%s',\n", "connector", 
"elasticsearch-6") +
+                       String.format("'%s'='%s',\n", 
ElasticsearchOptions.INDEX_OPTION.key(), index) +
+                       String.format("'%s'='%s',\n", 
ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType) +
+                       String.format("'%s'='%s',\n", 
ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200";) +
+                       String.format("'%s'='%s'\n", 
ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false") +
+                       ")");
+
+               tableEnvironment.fromValues(row(1L, 
LocalDateTime.parse("2012-12-12T12:12:12")))
+                       .executeInsert("esTable")
+                       .getJobClient()
+                       .get()
+                       .getJobExecutionResult(this.getClass().getClassLoader())
+                       .get();
+
+               Client client = elasticsearchResource.getClient();
+               Map<String, Object> response = client.get(new 
GetRequest("dynamic-index-2012-12-12", myType, "1")).actionGet().getSource();
+               Map<Object, Object> expectedMap = new HashMap<>();
+               expectedMap.put("a", 1);
+               expectedMap.put("b", "2012-12-12 12:12:12");
+               assertThat(response, equalTo(expectedMap));
+       }
+
        private static class MockContext implements DynamicTableSink.Context {
                @Override
                public boolean isBounded() {
diff --git 
a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
index 7f41eb7..483a783 100644
--- 
a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
@@ -239,6 +239,43 @@ public class Elasticsearch7DynamicSinkITCase {
                assertThat(result, equalTo(expectedMap));
        }
 
+       @Test
+       public void testWritingDocumentsWithDynamicIndex() throws Exception {
+               TableEnvironment tableEnvironment = 
TableEnvironment.create(EnvironmentSettings.newInstance()
+                       .useBlinkPlanner()
+                       .inStreamingMode()
+                       .build());
+
+               String index = "dynamic-index-{b|yyyy-MM-dd}";
+               tableEnvironment.executeSql("CREATE TABLE esTable (" +
+                       "a BIGINT NOT NULL,\n" +
+                       "b TIMESTAMP NOT NULL,\n" +
+                       "PRIMARY KEY (a) NOT ENFORCED\n" +
+                       ")\n" +
+                       "WITH (\n" +
+                       String.format("'%s'='%s',\n", "connector", 
"elasticsearch-7") +
+                       String.format("'%s'='%s',\n", 
ElasticsearchOptions.INDEX_OPTION.key(), index) +
+                       String.format("'%s'='%s',\n", 
ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200";) +
+                       String.format("'%s'='%s'\n", 
ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false") +
+                       ")");
+
+               tableEnvironment.fromValues(row(1L, 
LocalDateTime.parse("2012-12-12T12:12:12")))
+                       .executeInsert("esTable")
+                       .getJobClient()
+                       .get()
+                       .getJobExecutionResult(this.getClass().getClassLoader())
+                       .get();
+
+               Client client = elasticsearchResource.getClient();
+               Map<String, Object> response = client.get(new 
GetRequest("dynamic-index-2012-12-12", "1"))
+                       .actionGet()
+                       .getSource();
+               Map<Object, Object> expectedMap = new HashMap<>();
+               expectedMap.put("a", 1);
+               expectedMap.put("b", "2012-12-12 12:12:12");
+               assertThat(response, equalTo(expectedMap));
+       }
+
        private static class MockContext implements DynamicTableSink.Context {
                @Override
                public boolean isBounded() {

Reply via email to