This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 0eaa6717ed4 [branch-2.10][test]Run and fix tests (#19636)
0eaa6717ed4 is described below
commit 0eaa6717ed4fc9c1a2d09a64caeba0b2e163ae03
Author: Xiangying Meng <[email protected]>
AuthorDate: Sun Feb 26 18:35:00 2023 +0800
[branch-2.10][test]Run and fix tests (#19636)
Co-authored-by: Cong Zhao <[email protected]>
Co-authored-by: fengyubiao <[email protected]>
Co-authored-by: AloysZhang <[email protected]>
---
.../pulsar/tests/MockitoCleanupListener.java | 3 +-
.../mledger/util/PositionAckSetUtil.java | 7 +
.../broker/admin/impl/PersistentTopicsBase.java | 3 +-
.../loadbalance/impl/ModularLoadManagerImpl.java | 2 +-
.../broker/service/AbstractBaseDispatcher.java | 13 ++
.../pulsar/broker/web/PulsarWebResource.java | 2 +-
.../namespace/NamespaceOwnershipListenerTests.java | 5 +-
.../client/impl/TransactionEndToEndTest.java | 78 +++++++
.../admin/internal/PulsarAdminBuilderImplTest.java | 2 -
.../BitSetRecyclableRecyclableTest.java | 18 ++
.../pulsar/functions/instance/InstanceCache.java | 1 -
.../worker/ClusterServiceCoordinator.java | 3 -
.../elastic/ElasticSearchJavaRestClient.java | 224 ---------------------
.../io/sinks/ElasticSearch7SinkTester.java | 41 ----
.../io/sinks/ElasticSearch8SinkTester.java | 43 ----
.../integration/io/sinks/OpenSearchSinkTester.java | 88 --------
16 files changed, 124 insertions(+), 409 deletions(-)
diff --git
a/buildtools/src/main/java/org/apache/pulsar/tests/MockitoCleanupListener.java
b/buildtools/src/main/java/org/apache/pulsar/tests/MockitoCleanupListener.java
index 73fff1bb7e2..dcc5707afba 100644
---
a/buildtools/src/main/java/org/apache/pulsar/tests/MockitoCleanupListener.java
+++
b/buildtools/src/main/java/org/apache/pulsar/tests/MockitoCleanupListener.java
@@ -39,7 +39,8 @@ public class MockitoCleanupListener extends
BetweenTestClassesListenerAdapter {
protected void onBetweenTestClasses(Class<?> endedTestClass, Class<?>
startedTestClass) {
if (MOCKITO_CLEANUP_ENABLED) {
if (MockitoThreadLocalStateCleaner.INSTANCE.isEnabled()) {
- LOG.info("Cleaning up Mockito's
ThreadSafeMockingProgress.MOCKING_PROGRESS_PROVIDER thread local state.");
+ LOG.info("Cleaning up Mockito's
ThreadSafeMockingProgress.MOCKING_PROGRESS_PROVIDER "
+ + "thread local state.");
MockitoThreadLocalStateCleaner.INSTANCE.cleanup();
}
cleanupMockitoInline();
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
index da3043e7458..d1bc95c768a 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
@@ -60,6 +60,13 @@ public class PositionAckSetUtil {
return ackSet;
}
+ public static boolean isAckSetEmpty(long[] ackSet) {
+ BitSetRecyclable bitSet =
BitSetRecyclable.create().resetWords(ackSet);
+ boolean isEmpty = bitSet.isEmpty();
+ bitSet.recycle();
+ return isEmpty;
+ }
+
//This method is compare two position which position is bigger than
another one.
//When the ledgerId and entryId in this position is same to another one
and two position all have ack set, it will
//compare the ack set next bit index is bigger than another one.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 1be743a7ee9..51525303ac0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -5068,7 +5068,8 @@ public class PersistentTopicsBase extends AdminResource {
// Redirect the request to the appropriate broker if this broker
is not the owner of the topic
validateTopicOwnership(topicName, authoritative);
- getReplicatedSubscriptionStatusFromLocalBroker(topicName,
subName).get();
+ Map res =
getReplicatedSubscriptionStatusFromLocalBroker(topicName, subName).get();
+ asyncResponse.resume(res);
} catch (Exception e) {
log.error("[{}] Failed to get replicated subscription status on {}
{}", clientAppId(),
topicName, subName, e);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 1beb8ec7415..d81f6949f43 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -64,8 +64,8 @@ import
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLo
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 6b9ddcc1162..0b5108eeab8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -20,6 +20,7 @@
package org.apache.pulsar.broker.service;
import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
+import static
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetEmpty;
import com.google.common.collect.ImmutableList;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
@@ -246,6 +247,18 @@ public abstract class AbstractBaseDispatcher implements
Dispatcher {
// if actSet is null, use pendingAck ackSet
ackSet = positionInPendingAck.getAckSet();
}
+ // if the result of pendingAckSet(in
pendingAckHandle) AND the ackSet(in cursor) is empty
+ // filter this entry
+ if (isAckSetEmpty(ackSet)) {
+ entries.set(i, null);
+ entry.release();
+ continue;
+ }
+ } else {
+ // filter non-batch message in pendingAck state
+ entries.set(i, null);
+ entry.release();
+ continue;
}
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 68e5b7da824..976ba5d328e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -21,10 +21,10 @@ package org.apache.pulsar.broker.web;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isBlank;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.BoundType;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
index c0a46881628..5de86180716 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.namespace;
import com.google.common.collect.Sets;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -30,11 +31,9 @@ import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.testng.Assert.assertTrue;
@@ -55,7 +54,7 @@ public class NamespaceOwnershipListenerTests extends
BrokerTestBase {
}
@Test
- public void testNamespaceBundleOwnershipListener() throws
PulsarAdminException, InterruptedException, PulsarClientException {
+ public void testNamespaceBundleOwnershipListener() throws Exception {
final CountDownLatch countDownLatch = new CountDownLatch(2);
final AtomicBoolean onLoad = new AtomicBoolean(false);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index b0a4b28bbdc..a51ad6998ed 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -171,6 +171,84 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
assertNull(consumer.receive(2, TimeUnit.SECONDS));
}
+
+ @Test(dataProvider="enableBatch")
+ private void testFilterMsgsInPendingAckStateWhenConsumerDisconnect(boolean
enableBatch) throws Exception {
+ final String topicName = NAMESPACE1 +
"/testFilterMsgsInPendingAckStateWhenConsumerDisconnect-" + enableBatch;
+ final int count = 10;
+
+ @Cleanup
+ Producer<Integer> producer = null;
+ if (enableBatch) {
+ producer = pulsarClient
+ .newProducer(Schema.INT32)
+ .topic(topicName)
+ .enableBatching(true)
+ .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+ .batchingMaxMessages(count).create();
+ } else {
+ producer = pulsarClient
+ .newProducer(Schema.INT32)
+ .topic(topicName)
+ .enableBatching(false).create();
+ }
+
+ @Cleanup
+ Consumer<Integer> consumer = pulsarClient
+ .newConsumer(Schema.INT32)
+ .topic(topicName)
+ .isAckReceiptEnabled(true)
+ .subscriptionName("test")
+ .subscriptionType(SubscriptionType.Shared)
+ .enableBatchIndexAcknowledgment(true)
+ .subscribe();
+
+ for (int i = 0; i < count; i++) {
+ producer.sendAsync(i);
+ }
+
+ Transaction txn1 = getTxn();
+
+ Transaction txn2 = getTxn();
+
+
+ // txn1 ack half of messages and don't end the txn1
+ for (int i = 0; i < count / 2; i++) {
+ consumer.acknowledgeAsync(consumer.receive().getMessageId(),
txn1).get();
+ }
+
+ // txn2 ack the rest half of messages and commit tnx2
+ for (int i = count / 2; i < count; i++) {
+ consumer.acknowledgeAsync(consumer.receive().getMessageId(),
txn2).get();
+ }
+ // commit txn2
+ txn2.commit().get();
+
+ // close and re-create consumer
+ consumer.close();
+ consumer = pulsarClient
+ .newConsumer(Schema.INT32)
+ .topic(topicName)
+ .isAckReceiptEnabled(true)
+ .subscriptionName("test")
+ .subscriptionType(SubscriptionType.Shared)
+ .enableBatchIndexAcknowledgment(true)
+ .subscribe();
+
+ Message<Integer> message = consumer.receive(3, TimeUnit.SECONDS);
+ Assert.assertNull(message);
+
+ // abort txn1
+ txn1.abort().get();
+ // after txn1 aborted, consumer will receive messages txn1 contains
+ int receiveCounter = 0;
+ while((message = consumer.receive(3, TimeUnit.SECONDS)) != null) {
+ Assert.assertEquals(message.getValue().intValue(), receiveCounter);
+ receiveCounter ++;
+ }
+ Assert.assertEquals(receiveCounter, count / 2);
+ }
+
@Test(dataProvider="enableBatch")
private void produceCommitTest(boolean enableBatch) throws Exception {
@Cleanup
diff --git
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
index d278a187690..06b9c0e555b 100644
---
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
+++
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
@@ -34,8 +34,6 @@ import
org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.testng.Assert;
import org.testng.annotations.Test;
/**
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/BitSetRecyclableRecyclableTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/BitSetRecyclableRecyclableTest.java
index 0f42f35608a..b1a94c49162 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/BitSetRecyclableRecyclableTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/BitSetRecyclableRecyclableTest.java
@@ -45,4 +45,22 @@ public class BitSetRecyclableRecyclableTest {
Assert.assertTrue(bitset1.get(128));
Assert.assertFalse(bitset1.get(256));
}
+
+ @Test
+ public void testBitSetEmpty() {
+ BitSetRecyclable bitSet = BitSetRecyclable.create();
+ bitSet.set(0, 5);
+ bitSet.clear(1);
+ bitSet.clear(2);
+ bitSet.clear(3);
+ long[] array = bitSet.toLongArray();
+ Assert.assertFalse(bitSet.isEmpty());
+
Assert.assertFalse(BitSetRecyclable.create().resetWords(array).isEmpty());
+ bitSet.clear(0);
+ bitSet.clear(4);
+ Assert.assertTrue(bitSet.isEmpty());
+ long[] array1 = bitSet.toLongArray();
+
Assert.assertTrue(BitSetRecyclable.create().resetWords(array1).isEmpty());
+ bitSet.recycle();
+ }
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
index 38d85335bcd..7b73780caac 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.functions.instance;
-import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.Getter;
import java.util.concurrent.Executors;
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
index d15d2ba90c5..5dbb61456e4 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
@@ -31,9 +31,6 @@ import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.util.ExecutorProvider;
-
-import java.util.HashMap;
-import java.util.Map;
import java.util.function.Supplier;
@Slf4j
diff --git
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
deleted file mode 100644
index e420d7a5cef..00000000000
---
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.elasticsearch.client.elastic;
-
-import co.elastic.clients.elasticsearch.ElasticsearchClient;
-import co.elastic.clients.elasticsearch._types.ElasticsearchException;
-import co.elastic.clients.elasticsearch._types.Result;
-import co.elastic.clients.elasticsearch.core.DeleteRequest;
-import co.elastic.clients.elasticsearch.core.DeleteResponse;
-import co.elastic.clients.elasticsearch.core.IndexRequest;
-import co.elastic.clients.elasticsearch.core.IndexResponse;
-import co.elastic.clients.elasticsearch.core.SearchRequest;
-import co.elastic.clients.elasticsearch.core.SearchResponse;
-import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
-import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
-import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
-import co.elastic.clients.elasticsearch.indices.ExistsRequest;
-import co.elastic.clients.elasticsearch.indices.IndexSettings;
-import co.elastic.clients.elasticsearch.indices.RefreshRequest;
-import co.elastic.clients.json.jackson.JacksonJsonpMapper;
-import co.elastic.clients.transport.ElasticsearchTransport;
-import co.elastic.clients.transport.rest_client.RestClientTransport;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Objects;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.http.HttpHost;
-import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
-import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
-import org.apache.pulsar.io.elasticsearch.client.RestClient;
-import org.elasticsearch.client.Node;
-import org.elasticsearch.client.RestClientBuilder;
-import org.opensearch.action.bulk.BulkProcessor;
-
-@Slf4j
-public class ElasticSearchJavaRestClient extends RestClient {
-
- private final ElasticsearchClient client;
- private final ObjectMapper objectMapper = new ObjectMapper()
- .configure(SerializationFeature.INDENT_OUTPUT, false)
- .setSerializationInclusion(JsonInclude.Include.ALWAYS);
- private BulkProcessor bulkProcessor;
- private ElasticsearchTransport transport;
-
- @VisibleForTesting
- public void setBulkProcessor(BulkProcessor bulkProcessor) {
- this.bulkProcessor = bulkProcessor;
- }
-
- @VisibleForTesting
- public void setTransport(ElasticsearchTransport transport) {
- this.transport = transport;
- }
-
- public ElasticSearchJavaRestClient(ElasticSearchConfig elasticSearchConfig,
- BulkProcessor.Listener
bulkProcessorListener) {
- super(elasticSearchConfig, bulkProcessorListener);
-
- log.info("ElasticSearch URL {}", config.getElasticSearchUrl());
- final HttpHost[] httpHosts = getHttpHosts();
-
- RestClientBuilder builder =
org.elasticsearch.client.RestClient.builder(httpHosts)
- .setRequestConfigCallback(builder1 -> builder1
-
.setContentCompressionEnabled(config.isCompressionEnabled())
-
.setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs())
- .setConnectTimeout(config.getConnectTimeoutInMs())
- .setSocketTimeout(config.getSocketTimeoutInMs()))
- .setHttpClientConfigCallback(this.configCallback)
- .setFailureListener(new
org.elasticsearch.client.RestClient.FailureListener() {
- public void onFailure(Node node) {
- log.warn("Node host={} failed", node.getHost());
- }
- });
- transport = new RestClientTransport(builder.build(), new
JacksonJsonpMapper(objectMapper));
- client = new ElasticsearchClient(transport);
- if (elasticSearchConfig.isBulkEnabled()) {
- bulkProcessor = new ElasticBulkProcessor(elasticSearchConfig,
client, bulkProcessorListener);
- } else {
- bulkProcessor = null;
- }
- }
-
- @Override
- public boolean indexExists(String index) throws IOException {
- final ExistsRequest request = new ExistsRequest.Builder()
- .index(index)
- .build();
- return client.indices().exists(request).value();
- }
-
- @Override
- public boolean createIndex(String index) throws IOException {
- final CreateIndexRequest createIndexRequest = new
CreateIndexRequest.Builder()
- .index(index)
- .settings(new IndexSettings.Builder()
- .numberOfShards(config.getIndexNumberOfShards() + "")
- .numberOfReplicas(config.getIndexNumberOfReplicas() +
"")
- .build()
- )
- .build();
- try {
- final CreateIndexResponse createIndexResponse =
client.indices().create(createIndexRequest);
- if ((createIndexResponse.acknowledged())
- && createIndexResponse.shardsAcknowledged()) {
- return true;
- }
- throw new IOException("Unable to create index, acknowledged: " +
createIndexResponse.acknowledged()
- + " shardsAcknowledged: " +
createIndexResponse.shardsAcknowledged());
- } catch (ElasticsearchException ex) {
- final String errorType =
Objects.requireNonNull(ex.response().error().type());
- if (errorType.contains("resource_already_exists_exception")) {
- return false;
- }
- throw ex;
- }
- }
-
- @Override
- public boolean deleteIndex(String index) throws IOException {
- return client.indices().delete(new
DeleteIndexRequest.Builder().index(index).build()).acknowledged();
- }
-
- @Override
- public boolean deleteDocument(String index, String documentId) throws
IOException {
- final DeleteRequest req = new
- DeleteRequest.Builder()
- .index(config.getIndexName())
- .id(documentId)
- .build();
-
- DeleteResponse deleteResponse = client.delete(req);
- return deleteResponse.result().equals(Result.Deleted) ||
deleteResponse.result().equals(Result.NotFound);
- }
-
- @Override
- public boolean indexDocument(String index, String documentId, String
documentSource) throws IOException {
- final Map mapped = objectMapper.readValue(documentSource, Map.class);
- final IndexRequest<Object> indexRequest = new IndexRequest.Builder<>()
- .index(config.getIndexName())
- .document(mapped)
- .id(documentId)
- .build();
- final IndexResponse indexResponse = client.index(indexRequest);
-
- return indexResponse.result().equals(Result.Created) ||
indexResponse.result().equals(Result.Updated);
- }
-
- public SearchResponse<Map> search(String indexName) throws IOException {
- return search(indexName, "*:*");
- }
-
- @VisibleForTesting
- public SearchResponse<Map> search(String indexName, String query) throws
IOException {
- final RefreshRequest refreshRequest = new
RefreshRequest.Builder().index(indexName).build();
- client.indices().refresh(refreshRequest);
-
- query = query.replace("/", "\\/");
- return client.search(new SearchRequest.Builder().index(indexName)
- .q(query)
- .build(), Map.class);
- }
-
- @Override
- public long totalHits(String indexName) throws IOException {
- return totalHits(indexName, "*:*");
- }
-
- @Override
- public long totalHits(String indexName, String query) throws IOException {
- final SearchResponse<Map> searchResponse = search(indexName, query);
- return searchResponse.hits().total().value();
- }
-
- @Override
- public BulkProcessor getBulkProcessor() {
- if (bulkProcessor == null) {
- throw new IllegalStateException("bulkProcessor not enabled");
- }
- return bulkProcessor;
- }
-
- @Override
- public void closeClient() {
- if (bulkProcessor != null) {
- bulkProcessor.close();
- }
- // client doesn't need to be closed, only the transport instance
- try {
- transport.close();
- } catch (IOException e) {
- log.warn("error while closing the client", e);
- }
- }
-
- @VisibleForTesting
- public ElasticsearchClient getClient() {
- return client;
- }
-
- @VisibleForTesting
- public ElasticsearchTransport getTransport() {
- return transport;
- }
-}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
deleted file mode 100644
index 65b38c677bf..00000000000
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.io.sinks;
-
-import java.util.Optional;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.testcontainers.elasticsearch.ElasticsearchContainer;
-
-public class ElasticSearch7SinkTester extends ElasticSearchSinkTester {
-
- public static final String ELASTICSEARCH_7 =
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V7"))
- .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7");
-
-
- public ElasticSearch7SinkTester(boolean schemaEnable) {
- super(schemaEnable);
- }
-
- @Override
- protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
- return new ElasticsearchContainer(ELASTICSEARCH_7)
- .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m");
- }
-
-}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
deleted file mode 100644
index bb52c4ff03f..00000000000
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.io.sinks;
-
-import java.util.Optional;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.testcontainers.elasticsearch.ElasticsearchContainer;
-
-public class ElasticSearch8SinkTester extends ElasticSearchSinkTester {
-
- public static final String ELASTICSEARCH_8 =
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8"))
- .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1");
-
-
- public ElasticSearch8SinkTester(boolean schemaEnable) {
- super(schemaEnable);
- }
-
- @Override
- protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
- return new ElasticsearchContainer(ELASTICSEARCH_8)
- .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
- .withEnv("xpack.security.enabled", "false")
- .withEnv("xpack.security.http.ssl.enabled", "false");
- }
-
-}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
deleted file mode 100644
index 1e10cc4189c..00000000000
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.io.sinks;
-
-import java.util.Optional;
-import org.apache.http.HttpHost;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.awaitility.Awaitility;
-import org.opensearch.action.search.SearchRequest;
-import org.opensearch.action.search.SearchResponse;
-import org.opensearch.client.RequestOptions;
-import org.opensearch.client.RestClient;
-import org.opensearch.client.RestClientBuilder;
-import org.opensearch.client.RestHighLevelClient;
-import org.testcontainers.elasticsearch.ElasticsearchContainer;
-import org.testcontainers.utility.DockerImageName;
-
-import java.util.Map;
-
-import static org.testng.Assert.assertTrue;
-
-public class OpenSearchSinkTester extends ElasticSearchSinkTester {
-
- public static final String OPENSEARCH =
Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE"))
- .orElse("opensearchproject/opensearch:1.2.4");
-
- private RestHighLevelClient elasticClient;
-
-
- public OpenSearchSinkTester(boolean schemaEnable) {
- super(schemaEnable);
- }
-
- @Override
- protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
- DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH)
-
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
- return new ElasticsearchContainer(dockerImageName)
- .withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m")
- .withEnv("bootstrap.memory_lock", "true")
- .withEnv("plugins.security.disabled", "true");
- }
-
- @Override
- public void prepareSink() throws Exception {
- RestClientBuilder builder = RestClient.builder(
- new HttpHost(
- "localhost",
- serviceContainer.getMappedPort(9200),
- "http"));
- elasticClient = new RestHighLevelClient(builder);
- }
-
- @Override
- public void validateSinkResult(Map<String, String> kvs) {
- org.opensearch.action.search.SearchRequest searchRequest = new
SearchRequest("test-index");
-
- Awaitility.await().untilAsserted(() -> {
- SearchResponse searchResult = elasticClient.search(searchRequest,
RequestOptions.DEFAULT);
- assertTrue(searchResult.getHits().getTotalHits().value > 0,
searchResult.toString());
- });
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- if (elasticClient != null) {
- elasticClient.close();
- elasticClient = null;
- }
- }
-}