iduanyingjie created FLINK-30809:
------------------------------------
Summary: flink-connector-elasticsearch7 updates data pipeline does
not work
Key: FLINK-30809
URL: https://issues.apache.org/jira/browse/FLINK-30809
Project: Flink
Issue Type: Bug
Components: Connectors / ElasticSearch
Affects Versions: elasticsearch-3.0.0
Environment: Flink Version: 1.15.3
Mysql Version: 5.7
Elasticsearch Version: 7.17.7
Reporter: iduanyingjie
create elasticsearch in docker
{code:yaml}
version: '2.1'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.7
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
ports:
- "9200:9200"
- "9300:9300"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
kibana:
image: docker.elastic.co/kibana/kibana:7.17.7
ports:
- "5601:5601"
{code}
create table: records in mysql
{code:sql}
CREATE TABLE records (
id bigint unsigned NOT NULL AUTO_INCREMENT,
user_id bigint unsigned NOT NULL,
create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
{code}
insert some datas
{code:sql}
INSERT INTO test.records (id, user_id, create_time) VALUES(default, 123,
'2023-01-20 12:25:11');
INSERT INTO test.records (id, user_id, create_time) VALUES(default, 456,
'2023-01-20 12:25:30');
INSERT INTO test.records (id, user_id, create_time) VALUES(default, 789,
'2023-01-20 12:25:37');
{code}
create pipeline in es:
{code:java}
PUT /_ingest/pipeline/set_ingest_timestamp_fields
{
"processors": [
{
"set": {
"field": "ingest_timestamp",
"value": "{{_ingest.timestamp}}"
}
}
]
}{code}
create index in es:
{code:java}
PUT enriched_records
{
"settings": {
"default_pipeline": "set_ingest_timestamp_fields",
"number_of_shards": "1",
"number_of_replicas": "0"
}
}{code}
excute flink sql:
{code:sql}
CREATE TABLE records (
id INT,
user_id INT,
create_time TIMESTAMP(3),
proc_time AS PROCTIME(),
operation_time TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test',
'table-name' = 'records',
'server-time-zone' = 'UTC'
);
CREATE TABLE enriched_records (
id INT,
user_id INT,
create_time TIMESTAMP(3),
proc_time TIMESTAMP_LTZ(3),
operation_time TIMESTAMP_LTZ(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'enriched_records'
);
INSERT INTO enriched_records
SELECT
o.id,
o.user_id,
o.create_time,
o.proc_time,
o.operation_time
FROM records AS o;
{code}
We query the data in Elasticsearch use GET /enriched_records/_search and we
find that each record has an ingest_timestamp field and the value is the recent
time.
{code:json}
{
"_index":"enriched_records",
"_type":"_doc",
"_id":"3",
"_score":1,
"_source":{
"operation_time":"1970-01-01 00:00:00Z",
"create_time":"2023-01-20 12:25:37",
"user_id":789,
"ingest_timestamp":"2023-01-28T05:21:40.539754251Z",
"id":3,
"proc_time":"2023-01-28 05:21:40.233Z"
}
} {code}
When we modify a record in MySQL, the value of the ingest_timestamp field does
not change, and it seems that the pipeline set for this index is not working at
this moment.
{code:json}
{
"_index":"enriched_records",
"_type":"_doc",
"_id":"3",
"_score":1,
"_source":{
"operation_time":"2023-01-28 05:25:05Z",
"create_time":"2023-01-20 12:25:37",
"user_id":987,
"ingest_timestamp":"2023-01-28T05:21:40.539754251Z",
"id":3,
"proc_time":"2023-01-28 05:25:05.529Z"
}
}
{code}
If we directly modify a field in Elasticsearch, we can find that the value of
the ingest_timestamp field will change.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)