alpreu commented on a change in pull request #17363:
URL: https://github.com/apache/flink/pull/17363#discussion_r716642866



##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchProcessor.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+import org.elasticsearch.action.ActionRequest;
+
+import java.io.Serializable;
+
+/**
+ * Creates multiple {@link ActionRequest ActionRequests} from the incoming 
elements.

Review comment:
       It does not have to be multiple though, in theory it could be none, one, 
or multiple, right?

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.apache.http.HttpHost;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Flink Sink to produce or update data in an Elasticsearch index. The sink 
supports the following
+ * delivery guarantees.
+ *
+ * <ul>
+ *   <li>{@link DeliveryGuarantee#NONE} does not provide any guarantees: 
actions are flushed to
+ *       Elasticsearch only depending on the configurations of the bulk 
processor. In case of a
+ *       failure it might happen that actions are lost if the bulk processor 
has still buffered
+ *       actions.
+ *   <li>{@link DeliveryGuarantee#AT_LEAST_ONCE} on a checkpoint the sink will 
wait until all
+ *       outstanding actions are flushed to and acknowledged by Elasticsearch. 
No actions will be

Review comment:
       Could change outstanding to buffered to keep in line with the docs above

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.rest.RestStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class ElasticsearchWriter<IN> implements SinkWriter<IN, Void, Void> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchWriter.class);
+
+    private final ElasticsearchProcessor<IN> processor;
+    private final MailboxExecutor mailboxExecutor;
+    private final boolean flushOnCheckpoint;
+    private final BulkProcessor bulkProcessor;
+    private final RestHighLevelClient client;
+    private final RequestIndexer requestIndexer;
+    private final Counter numBytesOutCounter;
+
+    private long pendingActions = 0;
+    private boolean checkpointInProgress = false;
+    private volatile long lastSendTime = 0;
+    private volatile long receiveTime = Long.MAX_VALUE;
+
+    /**
+     * Constructor creating an elasticsearch writer.
+     *
+     * <p>It will throw a {@link RuntimeException} if {@link 
ElasticsearchProcessor#open()} fails.

Review comment:
       Should be FlinkRuntimeException?

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import org.apache.http.HttpHost;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Flink Sink to produce or update data in an Elasticsearch index. The sink 
supports the following
+ * delivery guarantees.
+ *
+ * <ul>
+ *   <li>{@link DeliveryGuarantee#NONE} does not provide any guarantees: 
actions are flushed to
+ *       Elasticsearch only depending on the configurations of the bulk 
processor. In case of a
+ *       failure it might happen that actions are lost if the bulk processor 
has still buffered

Review comment:
       still has buffered actions

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilder.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.http.HttpHost;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Builder to construct {@link ElasticsearchSink}.

Review comment:
       missing a

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilder.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.http.HttpHost;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Builder to construct {@link ElasticsearchSink}.
+ *
+ * <p>The following example shows the minimum setup to create a 
ElasticsearchSink that submits
+ * actions on checkpoint.
+ *
+ * <pre>{@code
+ * Elasticsearch<String> sink = Elasticsearch
+ *     .builder()
+ *     .setHosts(MY_ELASTICSEARCH_HOSTS)
+ *     .setProcessor(MY_ELASTICSEARCH_PROCESSOR)
+ *     .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ *     .build();
+ * }</pre>
+ *
+ * @param <IN> type of the records converted to Elasticsearch actions
+ */
+@PublicEvolving
+public class ElasticsearchSinkBuilder<IN> {
+
+    private int bulkFlushMaxActions = -1;
+    private int bulkFlushMaxMb = -1;
+    private long bulkFlushInterval = -1;
+    private ElasticsearchSinkBase.FlushBackoffType bulkFlushBackoffType;
+    private int bulkFlushBackoffRetries = -1;
+    private long bulkFlushBackOffDelay = -1;
+    private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE;
+    private List<HttpHost> hosts;
+    private ElasticsearchProcessor<? extends IN> processor;
+    private String username;
+    private String password;
+    private String connectionPathPrefix;
+
+    ElasticsearchSinkBuilder() {}
+
+    /**
+     * Sets the hosts where the Elasticsearch cluster nodes are reachable.
+     *
+     * @param hosts http addresses describing the node locations
+     */
+    public ElasticsearchSinkBuilder<IN> setHosts(HttpHost... hosts) {
+        checkNotNull(hosts);
+        checkState(hosts.length > 0, "Hosts cannot be empty.");
+        this.hosts = Arrays.asList(hosts);
+        return this;
+    }
+
+    /**
+     * Sets the processor which is invoked on every record to convert it to 
Elasticsearch actions.
+     *
+     * @param processor to process records into Elasticsearch actions.
+     * @return {@link ElasticsearchSinkBuilder}
+     */
+    public <T extends IN> ElasticsearchSinkBuilder<T> setProcessor(
+            ElasticsearchProcessor<? extends T> processor) {
+        checkNotNull(processor);
+        checkState(
+                InstantiationUtil.isSerializable(processor),
+                "The elasticsearch processor must be serializable.");
+        final ElasticsearchSinkBuilder<T> self = self();
+        self.processor = processor;
+        return self;
+    }
+
+    /**
+     * Sets the wanted the {@link DeliveryGuarantee}. The default delivery 
guarantee is {@link

Review comment:
       Sets the wanted {@link DeliveryGuarantee

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilder.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.http.HttpHost;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Builder to construct {@link ElasticsearchSink}.
+ *
+ * <p>The following example shows the minimum setup to create a 
ElasticsearchSink that submits
+ * actions on checkpoint.
+ *
+ * <pre>{@code
+ * Elasticsearch<String> sink = Elasticsearch
+ *     .builder()
+ *     .setHosts(MY_ELASTICSEARCH_HOSTS)
+ *     .setProcessor(MY_ELASTICSEARCH_PROCESSOR)
+ *     .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ *     .build();
+ * }</pre>
+ *
+ * @param <IN> type of the records converted to Elasticsearch actions
+ */
+@PublicEvolving
+public class ElasticsearchSinkBuilder<IN> {
+
+    private int bulkFlushMaxActions = -1;
+    private int bulkFlushMaxMb = -1;
+    private long bulkFlushInterval = -1;
+    private ElasticsearchSinkBase.FlushBackoffType bulkFlushBackoffType;
+    private int bulkFlushBackoffRetries = -1;
+    private long bulkFlushBackOffDelay = -1;
+    private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE;
+    private List<HttpHost> hosts;
+    private ElasticsearchProcessor<? extends IN> processor;
+    private String username;
+    private String password;
+    private String connectionPathPrefix;
+
+    ElasticsearchSinkBuilder() {}
+
+    /**
+     * Sets the hosts where the Elasticsearch cluster nodes are reachable.
+     *
+     * @param hosts http addresses describing the node locations
+     */
+    public ElasticsearchSinkBuilder<IN> setHosts(HttpHost... hosts) {
+        checkNotNull(hosts);
+        checkState(hosts.length > 0, "Hosts cannot be empty.");
+        this.hosts = Arrays.asList(hosts);
+        return this;
+    }
+
+    /**
+     * Sets the processor which is invoked on every record to convert it to 
Elasticsearch actions.
+     *
+     * @param processor to process records into Elasticsearch actions.
+     * @return {@link ElasticsearchSinkBuilder}
+     */
+    public <T extends IN> ElasticsearchSinkBuilder<T> setProcessor(
+            ElasticsearchProcessor<? extends T> processor) {
+        checkNotNull(processor);
+        checkState(
+                InstantiationUtil.isSerializable(processor),
+                "The elasticsearch processor must be serializable.");
+        final ElasticsearchSinkBuilder<T> self = self();
+        self.processor = processor;
+        return self;
+    }
+
+    /**
+     * Sets the wanted the {@link DeliveryGuarantee}. The default delivery 
guarantee is {@link
+     * #deliveryGuarantee}.

Review comment:
       missing default or is this on purpose so comments do not get stale?

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.metrics.testutils.MetricListener;
+import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.elasticsearch.sink.TestContext.buildMessage;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link ElasticsearchWriter}. */
+@Testcontainers
+class ElasticsearchWriterITCase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchWriterITCase.class);
+
+    @Container
+    private static final ElasticsearchContainer ES_CONTAINER =
+            new 
ElasticsearchContainer(DockerImageName.parse(DockerImageVersions.ELASTICSEARCH_7))
+                    .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+    private RestHighLevelClient client;
+    private TestContext context;
+    private MetricListener metricListener;
+
+    @BeforeEach
+    void setUp() {
+        metricListener = new MetricListener();
+        client =
+                new RestHighLevelClient(
+                        
RestClient.builder(HttpHost.create(ES_CONTAINER.getHttpHostAddress())));
+        context = new TestContext(client);
+    }
+
+    @AfterEach
+    void tearDown() throws IOException {
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    @Test
+    void testWriteOnBulkFlush() throws Exception {
+        final String index = "test-bulk-flush-without-checkpoint";
+        final int flushAfterNActions = 5;
+        final BulkProcessorConfig bulkProcessorConfig =
+                new BulkProcessorConfig(flushAfterNActions, -1, -1, null, 0, 
0);
+
+        try (final ElasticsearchWriter<Tuple2<Integer, String>> writer =
+                createWriter(index, false, bulkProcessorConfig)) {
+            writer.write(Tuple2.of(1, buildMessage(1)), null);
+            writer.write(Tuple2.of(2, buildMessage(2)), null);
+            writer.write(Tuple2.of(3, buildMessage(3)), null);
+            writer.write(Tuple2.of(4, buildMessage(4)), null);
+
+            // Ignore flush on checkpoint
+            writer.prepareCommit(true);
+            writer.prepareCommit(false);

Review comment:
       Why are both called in succession here?

##########
File path: 
flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchProcessor.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+import org.elasticsearch.action.ActionRequest;
+
+import java.io.Serializable;
+
+/**
+ * Creates multiple {@link ActionRequest ActionRequests} from the incoming 
elements.
+ *
+ * <p>This is used by sinks to prepare elements for sending them to 
Elasticsearch.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * private static class TestElasticsearchProcessor implements 
ElasticsearchProcessor<Tuple2<Integer, String>> {
+ *
+ *     public IndexRequest createIndexRequest(Tuple2<Integer, String> element) 
{
+ *         Map<String, Object> json = new HashMap<>();
+ *                json.put("data", element.f1);
+ *
+ *            return Requests.indexRequest()
+ *                    .index("my-index")
+ *                        .type("my-type")
+ *                        .id(element.f0.toString())
+ *                        .source(json);
+ *     }
+ *
+ *        public void process(Tuple2<Integer, String> element, RequestIndexer 
indexer) {
+ *            indexer.add(createIndexRequest(element));
+ *     }
+ * }
+ *
+ * }</pre>
+ *
+ * @param <T> The type of the element handled by this {@link 
ElasticsearchProcessor}
+ */
+@PublicEvolving
+public interface ElasticsearchProcessor<T> extends Function, Serializable {
+
+    /**
+     * Initialization method for the function. It is called once before the 
actual working process
+     * methods.
+     */
+    default void open() throws Exception {}
+
+    /** Tear-down method for the function. It is called when the sink closes. 
*/
+    default void close() throws Exception {}
+
+    /**
+     * Process the incoming element to produce multiple {@link ActionRequest 
ActionsRequests}. The

Review comment:
       Same as above, also typo in Action*s*requests




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to