alpreu commented on a change in pull request #17363: URL: https://github.com/apache/flink/pull/17363#discussion_r716642866
########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchProcessor.java ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.connector.sink.SinkWriter; + +import org.elasticsearch.action.ActionRequest; + +import java.io.Serializable; + +/** + * Creates multiple {@link ActionRequest ActionRequests} from the incoming elements. Review comment: It does not have to be multiple though, in theory it could be none, one, or multiple, right? ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.apache.http.HttpHost; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Flink Sink to produce or update data in an Elasticsearch index. The sink supports the following + * delivery guarantees. + * + * <ul> + * <li>{@link DeliveryGuarantee#NONE} does not provide any guarantees: actions are flushed to + * Elasticsearch only depending on the configurations of the bulk processor. In case of a + * failure it might happen that actions are lost if the bulk processor has still buffered + * actions. + * <li>{@link DeliveryGuarantee#AT_LEAST_ONCE} on a checkpoint the sink will wait until all + * outstanding actions are flushed to and acknowledged by Elasticsearch. No actions will be Review comment: Could change outstanding to buffered to keep in line with the docs above ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.apache.flink.util.FlinkRuntimeException; + +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.rest.RestStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +class ElasticsearchWriter<IN> implements SinkWriter<IN, Void, Void> { + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchWriter.class); + + private final ElasticsearchProcessor<IN> processor; + private final MailboxExecutor mailboxExecutor; + private final boolean flushOnCheckpoint; + private final BulkProcessor bulkProcessor; + private final RestHighLevelClient client; + private final RequestIndexer requestIndexer; + private final Counter numBytesOutCounter; + + private long pendingActions = 0; + private boolean checkpointInProgress = false; + private volatile long lastSendTime = 0; + private volatile long receiveTime = Long.MAX_VALUE; + + /** + * Constructor creating an elasticsearch writer. + * + * <p>It will throw a {@link RuntimeException} if {@link ElasticsearchProcessor#open()} fails. Review comment: Should be FlinkRuntimeException? ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.apache.http.HttpHost; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Flink Sink to produce or update data in an Elasticsearch index. The sink supports the following + * delivery guarantees. + * + * <ul> + * <li>{@link DeliveryGuarantee#NONE} does not provide any guarantees: actions are flushed to + * Elasticsearch only depending on the configurations of the bulk processor. In case of a + * failure it might happen that actions are lost if the bulk processor has still buffered Review comment: still has buffered actions ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilder.java ########## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.util.InstantiationUtil; + +import org.apache.http.HttpHost; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Builder to construct {@link ElasticsearchSink}. Review comment: missing a ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilder.java ########## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.util.InstantiationUtil; + +import org.apache.http.HttpHost; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Builder to construct {@link ElasticsearchSink}. + * + * <p>The following example shows the minimum setup to create a ElasticsearchSink that submits + * actions on checkpoint. + * + * <pre>{@code + * Elasticsearch<String> sink = Elasticsearch + * .builder() + * .setHosts(MY_ELASTICSEARCH_HOSTS) + * .setProcessor(MY_ELASTICSEARCH_PROCESSOR) + * .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + * .build(); + * }</pre> + * + * @param <IN> type of the records converted to Elasticsearch actions + */ +@PublicEvolving +public class ElasticsearchSinkBuilder<IN> { + + private int bulkFlushMaxActions = -1; + private int bulkFlushMaxMb = -1; + private long bulkFlushInterval = -1; + private ElasticsearchSinkBase.FlushBackoffType bulkFlushBackoffType; + private int bulkFlushBackoffRetries = -1; + private long bulkFlushBackOffDelay = -1; + private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE; + private List<HttpHost> hosts; + private ElasticsearchProcessor<? extends IN> processor; + private String username; + private String password; + private String connectionPathPrefix; + + ElasticsearchSinkBuilder() {} + + /** + * Sets the hosts where the Elasticsearch cluster nodes are reachable. + * + * @param hosts http addresses describing the node locations + */ + public ElasticsearchSinkBuilder<IN> setHosts(HttpHost... hosts) { + checkNotNull(hosts); + checkState(hosts.length > 0, "Hosts cannot be empty."); + this.hosts = Arrays.asList(hosts); + return this; + } + + /** + * Sets the processor which is invoked on every record to convert it to Elasticsearch actions. + * + * @param processor to process records into Elasticsearch actions. + * @return {@link ElasticsearchSinkBuilder} + */ + public <T extends IN> ElasticsearchSinkBuilder<T> setProcessor( + ElasticsearchProcessor<? extends T> processor) { + checkNotNull(processor); + checkState( + InstantiationUtil.isSerializable(processor), + "The elasticsearch processor must be serializable."); + final ElasticsearchSinkBuilder<T> self = self(); + self.processor = processor; + return self; + } + + /** + * Sets the wanted the {@link DeliveryGuarantee}. The default delivery guarantee is {@link Review comment: Sets the wanted {@link DeliveryGuarantee ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilder.java ########## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.util.InstantiationUtil; + +import org.apache.http.HttpHost; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Builder to construct {@link ElasticsearchSink}. + * + * <p>The following example shows the minimum setup to create a ElasticsearchSink that submits + * actions on checkpoint. + * + * <pre>{@code + * Elasticsearch<String> sink = Elasticsearch + * .builder() + * .setHosts(MY_ELASTICSEARCH_HOSTS) + * .setProcessor(MY_ELASTICSEARCH_PROCESSOR) + * .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + * .build(); + * }</pre> + * + * @param <IN> type of the records converted to Elasticsearch actions + */ +@PublicEvolving +public class ElasticsearchSinkBuilder<IN> { + + private int bulkFlushMaxActions = -1; + private int bulkFlushMaxMb = -1; + private long bulkFlushInterval = -1; + private ElasticsearchSinkBase.FlushBackoffType bulkFlushBackoffType; + private int bulkFlushBackoffRetries = -1; + private long bulkFlushBackOffDelay = -1; + private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE; + private List<HttpHost> hosts; + private ElasticsearchProcessor<? extends IN> processor; + private String username; + private String password; + private String connectionPathPrefix; + + ElasticsearchSinkBuilder() {} + + /** + * Sets the hosts where the Elasticsearch cluster nodes are reachable. + * + * @param hosts http addresses describing the node locations + */ + public ElasticsearchSinkBuilder<IN> setHosts(HttpHost... hosts) { + checkNotNull(hosts); + checkState(hosts.length > 0, "Hosts cannot be empty."); + this.hosts = Arrays.asList(hosts); + return this; + } + + /** + * Sets the processor which is invoked on every record to convert it to Elasticsearch actions. + * + * @param processor to process records into Elasticsearch actions. + * @return {@link ElasticsearchSinkBuilder} + */ + public <T extends IN> ElasticsearchSinkBuilder<T> setProcessor( + ElasticsearchProcessor<? extends T> processor) { + checkNotNull(processor); + checkState( + InstantiationUtil.isSerializable(processor), + "The elasticsearch processor must be serializable."); + final ElasticsearchSinkBuilder<T> self = self(); + self.processor = processor; + return self; + } + + /** + * Sets the wanted the {@link DeliveryGuarantee}. The default delivery guarantee is {@link + * #deliveryGuarantee}. Review comment: missing default or is this on purpose so comments do not get stale? ########## File path: flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java ########## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.groups.OperatorIOMetricGroup; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.apache.flink.metrics.testutils.MetricListener; +import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.function.ThrowingRunnable; + +import org.apache.http.HttpHost; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.Optional; + +import static org.apache.flink.connector.elasticsearch.sink.TestContext.buildMessage; +import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Tests for {@link ElasticsearchWriter}. */ +@Testcontainers +class ElasticsearchWriterITCase { + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchWriterITCase.class); + + @Container + private static final ElasticsearchContainer ES_CONTAINER = + new ElasticsearchContainer(DockerImageName.parse(DockerImageVersions.ELASTICSEARCH_7)) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + private RestHighLevelClient client; + private TestContext context; + private MetricListener metricListener; + + @BeforeEach + void setUp() { + metricListener = new MetricListener(); + client = + new RestHighLevelClient( + RestClient.builder(HttpHost.create(ES_CONTAINER.getHttpHostAddress()))); + context = new TestContext(client); + } + + @AfterEach + void tearDown() throws IOException { + if (client != null) { + client.close(); + } + } + + @Test + void testWriteOnBulkFlush() throws Exception { + final String index = "test-bulk-flush-without-checkpoint"; + final int flushAfterNActions = 5; + final BulkProcessorConfig bulkProcessorConfig = + new BulkProcessorConfig(flushAfterNActions, -1, -1, null, 0, 0); + + try (final ElasticsearchWriter<Tuple2<Integer, String>> writer = + createWriter(index, false, bulkProcessorConfig)) { + writer.write(Tuple2.of(1, buildMessage(1)), null); + writer.write(Tuple2.of(2, buildMessage(2)), null); + writer.write(Tuple2.of(3, buildMessage(3)), null); + writer.write(Tuple2.of(4, buildMessage(4)), null); + + // Ignore flush on checkpoint + writer.prepareCommit(true); + writer.prepareCommit(false); Review comment: Why are both called in succession here? ########## File path: flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchProcessor.java ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.connector.sink.SinkWriter; + +import org.elasticsearch.action.ActionRequest; + +import java.io.Serializable; + +/** + * Creates multiple {@link ActionRequest ActionRequests} from the incoming elements. + * + * <p>This is used by sinks to prepare elements for sending them to Elasticsearch. + * + * <p>Example: + * + * <pre>{@code + * private static class TestElasticsearchProcessor implements ElasticsearchProcessor<Tuple2<Integer, String>> { + * + * public IndexRequest createIndexRequest(Tuple2<Integer, String> element) { + * Map<String, Object> json = new HashMap<>(); + * json.put("data", element.f1); + * + * return Requests.indexRequest() + * .index("my-index") + * .type("my-type") + * .id(element.f0.toString()) + * .source(json); + * } + * + * public void process(Tuple2<Integer, String> element, RequestIndexer indexer) { + * indexer.add(createIndexRequest(element)); + * } + * } + * + * }</pre> + * + * @param <T> The type of the element handled by this {@link ElasticsearchProcessor} + */ +@PublicEvolving +public interface ElasticsearchProcessor<T> extends Function, Serializable { + + /** + * Initialization method for the function. It is called once before the actual working process + * methods. + */ + default void open() throws Exception {} + + /** Tear-down method for the function. It is called when the sink closes. */ + default void close() throws Exception {} + + /** + * Process the incoming element to produce multiple {@link ActionRequest ActionsRequests}. The Review comment: Same as above, also typo in Action*s*requests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
