This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 97a88255717 [fix][pulsar-io] Fix memory leak in ElasticSearch sink
(#15125)
97a88255717 is described below
commit 97a882557179071b6e039e5fd1d76b2f18dfa8c1
Author: Nicolò Boschi <[email protected]>
AuthorDate: Wed May 4 17:35:44 2022 +0200
[fix][pulsar-io] Fix memory leak in ElasticSearch sink (#15125)
---
.../io/elasticsearch/ElasticSearchClient.java | 21 ++------
.../io/elasticsearch/client/BulkProcessor.java | 5 +-
.../client/elastic/ElasticBulkProcessor.java | 40 +++++++-------
.../opensearch/OpenSearchHighLevelRestClient.java | 63 +++++++++++++++++-----
.../io/elasticsearch/ElasticSearchClientTests.java | 28 +++++++++-
.../io/elasticsearch/ElasticSearchSinkTests.java | 43 +++++++++++++--
6 files changed, 146 insertions(+), 54 deletions(-)
diff --git
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
index 21ec35c7158..dcbdfbc64fb 100644
---
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
+++
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
@@ -28,9 +28,6 @@ import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
@@ -56,9 +53,7 @@ public class ElasticSearchClient implements AutoCloseable {
final Set<String> indexCache = new HashSet<>();
final Map<String, String> topicToIndexCache = new HashMap<>();
- final ConcurrentMap<Long, Record> records = new ConcurrentHashMap<>();
final AtomicReference<Exception> irrecoverableError = new
AtomicReference<>();
- final AtomicLong bulkOperationIdGenerator = new AtomicLong();
private final IndexNameFormatter indexNameFormatter;
public ElasticSearchClient(ElasticSearchConfig elasticSearchConfig) {
@@ -69,9 +64,6 @@ public class ElasticSearchClient implements AutoCloseable {
this.indexNameFormatter = null;
}
final BulkProcessor.Listener bulkListener = new
BulkProcessor.Listener() {
- private Record
removeAndGetRecordForOperation(BulkProcessor.BulkOperationRequest operation) {
- return records.remove(operation.getOperationId());
- }
@Override
public void afterBulk(long executionId,
List<BulkProcessor.BulkOperationRequest> bulkOperationList,
@@ -81,7 +73,7 @@ public class ElasticSearchClient implements AutoCloseable {
}
int index = 0;
for (BulkProcessor.BulkOperationResult result: results) {
- final Record record =
removeAndGetRecordForOperation(bulkOperationList.get(index++));
+ final Record record =
bulkOperationList.get(index++).getPulsarRecord();
if (result.isError()) {
record.fail();
checkForIrrecoverableError(result);
@@ -95,7 +87,7 @@ public class ElasticSearchClient implements AutoCloseable {
public void afterBulk(long executionId,
List<BulkProcessor.BulkOperationRequest> bulkOperationList, Throwable
throwable) {
log.warn("Bulk request id={} failed:", executionId, throwable);
for (BulkProcessor.BulkOperationRequest operation:
bulkOperationList) {
- final Record record =
removeAndGetRecordForOperation(operation);
+ final Record record = operation.getPulsarRecord();
record.fail();
}
}
@@ -150,15 +142,12 @@ public class ElasticSearchClient implements AutoCloseable
{
final String documentId = idAndDoc.getLeft();
final String documentSource = idAndDoc.getRight();
- final long operationId =
bulkOperationIdGenerator.incrementAndGet();
final BulkProcessor.BulkIndexRequest bulkIndexRequest =
BulkProcessor.BulkIndexRequest.builder()
.index(indexName)
.documentId(documentId)
.documentSource(documentSource)
- .requestId(operationId)
+ .record(record)
.build();
-
- records.put(operationId, record);
client.getBulkProcessor().appendIndexRequest(bulkIndexRequest);
} catch (Exception e) {
log.debug("index failed id=" + idAndDoc.getLeft(), e);
@@ -203,14 +192,12 @@ public class ElasticSearchClient implements AutoCloseable
{
checkIndexExists(record);
final String indexName = indexName(record);
- final long operationId =
bulkOperationIdGenerator.incrementAndGet();
final BulkProcessor.BulkDeleteRequest bulkDeleteRequest =
BulkProcessor.BulkDeleteRequest.builder()
.index(indexName)
.documentId(id)
- .requestId(operationId)
+ .record(record)
.build();
- records.put(operationId, record);
client.getBulkProcessor().appendDeleteRequest(bulkDeleteRequest);
} catch (Exception e) {
log.debug("delete failed id: {}", id, e);
diff --git
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/BulkProcessor.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/BulkProcessor.java
index 39d7ea8391b..709d4212390 100644
---
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/BulkProcessor.java
+++
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/BulkProcessor.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.List;
import lombok.Builder;
import lombok.Getter;
+import org.apache.pulsar.functions.api.Record;
/**
* Processor for "bulk" call to the Elastic REST Endpoint.
@@ -32,7 +33,7 @@ public interface BulkProcessor extends Closeable {
@Builder
@Getter
class BulkOperationRequest {
- private long operationId;
+ private Record pulsarRecord;
}
@Builder
@@ -57,6 +58,7 @@ public interface BulkProcessor extends Closeable {
@Builder
@Getter
class BulkIndexRequest {
+ private Record record;
private long requestId;
private String index;
private String documentId;
@@ -66,6 +68,7 @@ public interface BulkProcessor extends Closeable {
@Builder
@Getter
class BulkDeleteRequest {
+ private Record record;
private long requestId;
private String index;
private String documentId;
diff --git
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor.java
index 70edd353fc4..3754c0c7acb 100644
---
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor.java
+++
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor.java
@@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
import org.apache.pulsar.io.elasticsearch.RandomExponentialRetry;
import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
@@ -56,7 +57,7 @@ public class ElasticBulkProcessor implements BulkProcessor {
private final AtomicLong executionIdGen = new AtomicLong();
private final int bulkActions;
private final long bulkSize;
- private final List<BulkOperationWithId> pendingOperations = new
ArrayList<>();
+ private final List<BulkOperationWithPulsarRecord> pendingOperations = new
ArrayList<>();
private final BulkRequestHandler bulkRequestHandler;
private volatile boolean closed = false;
private final ReentrantLock lock;
@@ -99,7 +100,7 @@ public class ElasticBulkProcessor implements BulkProcessor {
if (config.getBulkSizeInMb() > 0) {
sourceLength =
request.getDocumentSource().getBytes(StandardCharsets.UTF_8).length;
}
- add(BulkOperationWithId.indexOperation(indexOperation,
request.getRequestId(), sourceLength));
+ add(BulkOperationWithPulsarRecord.indexOperation(indexOperation,
request.getRecord(), sourceLength));
}
@Override
@@ -108,7 +109,7 @@ public class ElasticBulkProcessor implements BulkProcessor {
.index(request.getIndex())
.id(request.getDocumentId())
.build();
- add(BulkOperationWithId.deleteOperation(deleteOperation,
request.getRequestId()));
+ add(BulkOperationWithPulsarRecord.deleteOperation(deleteOperation,
request.getRecord()));
}
protected void ensureOpen() {
@@ -169,7 +170,7 @@ public class ElasticBulkProcessor implements BulkProcessor {
execute(false);
}
- public void add(BulkOperationWithId bulkOperation) {
+ public void add(BulkOperationWithPulsarRecord bulkOperation) {
lock.lock();
try {
ensureOpen();
@@ -202,7 +203,7 @@ public class ElasticBulkProcessor implements BulkProcessor {
}
}
- public static class BulkOperationWithId extends BulkOperation {
+ public static class BulkOperationWithPulsarRecord extends BulkOperation {
/**
* REQUEST_OVERHEAD:
@@ -211,29 +212,30 @@ public class ElasticBulkProcessor implements
BulkProcessor {
*/
private static final int REQUEST_OVERHEAD = 50;
- public static BulkOperationWithId indexOperation(IndexOperation
indexOperation,
- long operationId,
- long sourceLength) {
+ public static BulkOperationWithPulsarRecord
indexOperation(IndexOperation indexOperation,
+ Record
pulsarRecord,
+ long
sourceLength) {
long estimatedSizeInBytes = REQUEST_OVERHEAD + sourceLength;
- return new BulkOperationWithId(indexOperation, operationId,
estimatedSizeInBytes);
+ return new BulkOperationWithPulsarRecord(indexOperation,
pulsarRecord, estimatedSizeInBytes);
}
- public static BulkOperationWithId deleteOperation(DeleteOperation
indexOperation,
- long operationId) {
- return new BulkOperationWithId(indexOperation, operationId,
REQUEST_OVERHEAD);
+ public static BulkOperationWithPulsarRecord
deleteOperation(DeleteOperation indexOperation,
+ Record
pulsarRecord) {
+ return new BulkOperationWithPulsarRecord(indexOperation,
pulsarRecord, REQUEST_OVERHEAD);
}
- private final long operationId;
+ private final Record pulsarRecord;
private final long estimatedSizeInBytes;
- public BulkOperationWithId(BulkOperationVariant value, long
operationId, long estimatedSizeInBytes) {
+ public BulkOperationWithPulsarRecord(BulkOperationVariant value,
+ Record pulsarRecord, long
estimatedSizeInBytes) {
super(value);
- this.operationId = operationId;
+ this.pulsarRecord = pulsarRecord;
this.estimatedSizeInBytes = estimatedSizeInBytes;
}
- public long getOperationId() {
- return operationId;
+ public Record getPulsarRecord() {
+ return pulsarRecord;
}
public long getEstimatedSizeInBytes() {
@@ -328,9 +330,9 @@ public class ElasticBulkProcessor implements BulkProcessor {
private List<BulkOperationRequest> convertBulkRequest(BulkRequest
bulkRequest) {
return bulkRequest.operations().stream().map(op -> {
- BulkOperationWithId opWithId = (BulkOperationWithId)
op;
+ BulkOperationWithPulsarRecord opWithRecord =
(BulkOperationWithPulsarRecord) op;
return BulkOperationRequest.builder()
- .operationId(opWithId.getOperationId())
+ .pulsarRecord(opWithRecord.getPulsarRecord())
.build();
}).collect(Collectors.toList());
}
diff --git
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
index b48f103c4f1..1a939774c4c 100644
---
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
+++
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
@@ -23,18 +23,15 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
import org.apache.pulsar.io.elasticsearch.RandomExponentialRetry;
import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
import org.apache.pulsar.io.elasticsearch.client.RestClient;
-import org.elasticsearch.client.Node;
-import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.refresh.RefreshRequest;
@@ -65,9 +62,43 @@ import org.opensearch.search.builder.SearchSourceBuilder;
@Slf4j
public class OpenSearchHighLevelRestClient extends RestClient implements
BulkProcessor {
+ private interface DocWriteRequestWithPulsarRecord {
+ Record getPulsarRecord();
+ }
+
+ private static class IndexRequestWithPulsarRecord extends IndexRequest
implements DocWriteRequestWithPulsarRecord {
+ private Record pulsarRecord;
+
+ public IndexRequestWithPulsarRecord(String index, Record pulsarRecord)
{
+ super(index);
+ this.pulsarRecord = pulsarRecord;
+ }
+
+ @Override
+ public Record getPulsarRecord() {
+ return pulsarRecord;
+ }
+ }
+
+
+ private static class DeleteRequestWithPulsarRecord
+ extends DeleteRequest
+ implements DocWriteRequestWithPulsarRecord {
+ private Record pulsarRecord;
+
+ public DeleteRequestWithPulsarRecord(String index, Record
pulsarRecord) {
+ super(index);
+ this.pulsarRecord = pulsarRecord;
+ }
+
+ @Override
+ public Record getPulsarRecord() {
+ return pulsarRecord;
+ }
+ }
+
private RestHighLevelClient client;
private org.opensearch.action.bulk.BulkProcessor internalBulkProcessor;
- private final ConcurrentMap<DocWriteRequest<?>, Long> bulkRequestMappings
= new ConcurrentHashMap<>();
public OpenSearchHighLevelRestClient(ElasticSearchConfig
elasticSearchConfig,
BulkProcessor.Listener
bulkProcessorListener) {
@@ -83,7 +114,8 @@ public class OpenSearchHighLevelRestClient extends
RestClient implements BulkPro
.setSocketTimeout(config.getSocketTimeoutInMs()))
.setHttpClientConfigCallback(this.configCallback)
.setFailureListener(new
org.opensearch.client.RestClient.FailureListener() {
- public void onFailure(Node node) {
+ @Override
+ public void onFailure(org.opensearch.client.Node node) {
log.warn("Node host={} failed", node.getHost());
}
});
@@ -101,9 +133,17 @@ public class OpenSearchHighLevelRestClient extends
RestClient implements BulkPro
private
List<BulkProcessor.BulkOperationRequest>
convertBulkRequest(BulkRequest
bulkRequest) {
return
bulkRequest.requests().stream().map(docWriteRequest -> {
- long requestId =
bulkRequestMappings.get(docWriteRequest);
+ final Record pulsarRecord;
+ if (docWriteRequest instanceof
DocWriteRequestWithPulsarRecord) {
+ DocWriteRequestWithPulsarRecord
requestWithId =
+
(DocWriteRequestWithPulsarRecord) docWriteRequest;
+ pulsarRecord =
requestWithId.getPulsarRecord();
+ } else {
+ throw new
UnsupportedOperationException("Unexpected bulk request of type: "
+ +
docWriteRequest.getClass());
+ }
return
BulkProcessor.BulkOperationRequest.builder()
- .operationId(requestId)
+ .pulsarRecord(pulsarRecord)
.build();
}).collect(Collectors.toList());
}
@@ -235,25 +275,22 @@ public class OpenSearchHighLevelRestClient extends
RestClient implements BulkPro
return this;
}
-
@Override
public void appendIndexRequest(BulkProcessor.BulkIndexRequest request)
throws IOException {
- IndexRequest indexRequest = Requests.indexRequest(request.getIndex());
+ IndexRequest indexRequest = new
IndexRequestWithPulsarRecord(request.getIndex(), request.getRecord());
if (!Strings.isNullOrEmpty(request.getDocumentId())) {
indexRequest.id(request.getDocumentId());
}
indexRequest.type(config.getTypeName());
indexRequest.source(request.getDocumentSource(), XContentType.JSON);
- bulkRequestMappings.put(indexRequest, request.getRequestId());
internalBulkProcessor.add(indexRequest);
}
@Override
public void appendDeleteRequest(BulkProcessor.BulkDeleteRequest request)
throws IOException {
- DeleteRequest deleteRequest =
Requests.deleteRequest(request.getIndex());
+ DeleteRequest deleteRequest = new
DeleteRequestWithPulsarRecord(request.getIndex(), request.getRecord());
deleteRequest.id(request.getDocumentId());
deleteRequest.type(config.getTypeName());
- bulkRequestMappings.put(deleteRequest, request.getRequestId());
internalBulkProcessor.add(deleteRequest);
}
diff --git
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
index 7ac86cd2ef0..7ade25046f1 100644
---
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
+++
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
@@ -346,7 +346,6 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
Awaitility.await().untilAsserted(() -> {
assertEquals(mockRecord.acked, 15);
assertEquals(mockRecord.failed, 0);
- assertEquals(client.records.size(), 0);
});
} finally {
@@ -356,4 +355,31 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
}
}
+ @Test
+ public void testBulkIndexAndDelete() throws Exception {
+ final String index = "indexbulktest-" + UUID.randomUUID();
+ ElasticSearchConfig config = new ElasticSearchConfig()
+ .setElasticSearchUrl("http://"+container.getHttpHostAddress())
+ .setIndexName(index)
+ .setBulkEnabled(true)
+ .setBulkActions(10)
+ .setBulkFlushIntervalInMs(-1L);
+
+ try (ElasticSearchClient client = new ElasticSearchClient(config)) {
+ assertTrue(client.createIndexIfNeeded(index));
+ MockRecord<GenericObject> mockRecord = new MockRecord<>();
+ for (int i = 0; i < 5; i++) {
+ client.bulkIndex(mockRecord, Pair.of("key" + i, "{\"a\":" + i
+ "}"));
+ client.bulkDelete(mockRecord, "key" + i);
+ }
+ assertEquals(mockRecord.acked, 10);
+ assertEquals(mockRecord.failed, 0);
+ assertEquals(client.getRestClient().totalHits(index), 0);
+ // no effect
+ client.flush();
+
+ assertEquals(mockRecord.acked, 10);
+ }
+ }
+
}
diff --git
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index 6ffed0f4277..5c0651f3720 100644
---
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -19,11 +19,13 @@
package org.apache.pulsar.io.elasticsearch;
import co.elastic.clients.transport.ElasticsearchTransport;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
@@ -35,11 +37,13 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
-
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
@@ -60,8 +64,6 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.util.Locale;
-import java.util.concurrent.TimeUnit;
import static org.testng.Assert.assertNull;
@@ -207,6 +209,41 @@ public abstract class ElasticSearchSinkTests extends
ElasticSearchTestBase {
}
@Test
+ public final void sendNoSchemaTest() throws Exception {
+
+ when(mockRecord.getMessage()).thenAnswer(new
Answer<Optional<Message<String>>>() {
+ @Override
+ public Optional<Message<String>> answer(InvocationOnMock
invocation) throws Throwable {
+ final MessageImpl mock = mock(MessageImpl.class);
+
when(mock.getData()).thenReturn("{\"a\":1}".getBytes(StandardCharsets.UTF_8));
+ return Optional.of(mock);
+ }
+ });
+
+ when(mockRecord.getKey()).thenAnswer(new Answer<Optional<String>>() {
+ public Optional<String> answer(InvocationOnMock invocation) throws
Throwable {
+ return null;
+ }});
+
+
+ when(mockRecord.getValue()).thenAnswer(new Answer<String>() {
+ public String answer(InvocationOnMock invocation) throws Throwable
{
+ return "hello";
+ }});
+
+ when(mockRecord.getSchema()).thenAnswer(new Answer<Schema>() {
+ public Schema answer(InvocationOnMock invocation) throws Throwable
{
+ return Schema.STRING;
+ }});
+
+ map.put("indexName", "test-index");
+ map.put("schemaEnable", "false");
+ sink.open(map, mockSinkContext);
+ sink.write(mockRecord);
+ verify(mockRecord, times(1)).ack();
+ }
+
+ @Test(enabled = true)
public final void sendKeyIgnoreSingleField() throws Exception {
final String index = "testkeyignore";
map.put("indexName", index);