This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 0eaa6717ed4 [branch-2.10][test]Run and fix tests (#19636)
0eaa6717ed4 is described below

commit 0eaa6717ed4fc9c1a2d09a64caeba0b2e163ae03
Author: Xiangying Meng <[email protected]>
AuthorDate: Sun Feb 26 18:35:00 2023 +0800

    [branch-2.10][test]Run and fix tests (#19636)
    
    Co-authored-by: Cong Zhao <[email protected]>
    Co-authored-by: fengyubiao <[email protected]>
    Co-authored-by: AloysZhang <[email protected]>
---
 .../pulsar/tests/MockitoCleanupListener.java       |   3 +-
 .../mledger/util/PositionAckSetUtil.java           |   7 +
 .../broker/admin/impl/PersistentTopicsBase.java    |   3 +-
 .../loadbalance/impl/ModularLoadManagerImpl.java   |   2 +-
 .../broker/service/AbstractBaseDispatcher.java     |  13 ++
 .../pulsar/broker/web/PulsarWebResource.java       |   2 +-
 .../namespace/NamespaceOwnershipListenerTests.java |   5 +-
 .../client/impl/TransactionEndToEndTest.java       |  78 +++++++
 .../admin/internal/PulsarAdminBuilderImplTest.java |   2 -
 .../BitSetRecyclableRecyclableTest.java            |  18 ++
 .../pulsar/functions/instance/InstanceCache.java   |   1 -
 .../worker/ClusterServiceCoordinator.java          |   3 -
 .../elastic/ElasticSearchJavaRestClient.java       | 224 ---------------------
 .../io/sinks/ElasticSearch7SinkTester.java         |  41 ----
 .../io/sinks/ElasticSearch8SinkTester.java         |  43 ----
 .../integration/io/sinks/OpenSearchSinkTester.java |  88 --------
 16 files changed, 124 insertions(+), 409 deletions(-)

diff --git 
a/buildtools/src/main/java/org/apache/pulsar/tests/MockitoCleanupListener.java 
b/buildtools/src/main/java/org/apache/pulsar/tests/MockitoCleanupListener.java
index 73fff1bb7e2..dcc5707afba 100644
--- 
a/buildtools/src/main/java/org/apache/pulsar/tests/MockitoCleanupListener.java
+++ 
b/buildtools/src/main/java/org/apache/pulsar/tests/MockitoCleanupListener.java
@@ -39,7 +39,8 @@ public class MockitoCleanupListener extends 
BetweenTestClassesListenerAdapter {
     protected void onBetweenTestClasses(Class<?> endedTestClass, Class<?> 
startedTestClass) {
         if (MOCKITO_CLEANUP_ENABLED) {
             if (MockitoThreadLocalStateCleaner.INSTANCE.isEnabled()) {
-                LOG.info("Cleaning up Mockito's 
ThreadSafeMockingProgress.MOCKING_PROGRESS_PROVIDER thread local state.");
+                LOG.info("Cleaning up Mockito's 
ThreadSafeMockingProgress.MOCKING_PROGRESS_PROVIDER "
+                        + "thread local state.");
                 MockitoThreadLocalStateCleaner.INSTANCE.cleanup();
             }
             cleanupMockitoInline();
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
index da3043e7458..d1bc95c768a 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java
@@ -60,6 +60,13 @@ public class PositionAckSetUtil {
         return ackSet;
     }
 
+    public static boolean isAckSetEmpty(long[] ackSet) {
+        BitSetRecyclable bitSet =  
BitSetRecyclable.create().resetWords(ackSet);
+        boolean isEmpty = bitSet.isEmpty();
+        bitSet.recycle();
+        return isEmpty;
+    }
+
     //This method is compare two position which position is bigger than 
another one.
     //When the ledgerId and entryId in this position is same to another one 
and two position all have ack set, it will
     //compare the ack set next bit index is bigger than another one.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 1be743a7ee9..51525303ac0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -5068,7 +5068,8 @@ public class PersistentTopicsBase extends AdminResource {
             // Redirect the request to the appropriate broker if this broker 
is not the owner of the topic
             validateTopicOwnership(topicName, authoritative);
 
-            getReplicatedSubscriptionStatusFromLocalBroker(topicName, 
subName).get();
+            Map res = 
getReplicatedSubscriptionStatusFromLocalBroker(topicName, subName).get();
+            asyncResponse.resume(res);
         } catch (Exception e) {
             log.error("[{}] Failed to get replicated subscription status on {} 
{}", clientAppId(),
                     topicName, subName, e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 1beb8ec7415..d81f6949f43 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -64,8 +64,8 @@ import 
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLo
 import org.apache.pulsar.broker.resources.ClusterResources;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 6b9ddcc1162..0b5108eeab8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.broker.service;
 
 import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
+import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetEmpty;
 import com.google.common.collect.ImmutableList;
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
@@ -246,6 +247,18 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
                                 // if actSet is null, use pendingAck ackSet
                                 ackSet = positionInPendingAck.getAckSet();
                             }
+                            // if the result of pendingAckSet(in 
pendingAckHandle) AND the ackSet(in cursor) is empty
+                            // filter this entry
+                            if (isAckSetEmpty(ackSet)) {
+                                entries.set(i, null);
+                                entry.release();
+                                continue;
+                            }
+                        } else {
+                            // filter non-batch message in pendingAck state
+                            entries.set(i, null);
+                            entry.release();
+                            continue;
                         }
                     }
                 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 68e5b7da824..976ba5d328e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -21,10 +21,10 @@ package org.apache.pulsar.broker.web;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.commons.lang3.StringUtils.isBlank;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.github.benmanes.caffeine.cache.CacheLoader;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.BoundType;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
index c0a46881628..5de86180716 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.namespace;
 
 import com.google.common.collect.Sets;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -30,11 +31,9 @@ import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.testng.Assert.assertTrue;
@@ -55,7 +54,7 @@ public class NamespaceOwnershipListenerTests extends 
BrokerTestBase {
     }
 
     @Test
-    public void testNamespaceBundleOwnershipListener() throws 
PulsarAdminException, InterruptedException, PulsarClientException {
+    public void testNamespaceBundleOwnershipListener() throws Exception {
 
         final CountDownLatch countDownLatch = new CountDownLatch(2);
         final AtomicBoolean onLoad = new AtomicBoolean(false);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index b0a4b28bbdc..a51ad6998ed 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -171,6 +171,84 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         assertNull(consumer.receive(2, TimeUnit.SECONDS));
     }
 
+
+    @Test(dataProvider="enableBatch")
+    private void testFilterMsgsInPendingAckStateWhenConsumerDisconnect(boolean 
enableBatch) throws Exception {
+        final String topicName = NAMESPACE1 + 
"/testFilterMsgsInPendingAckStateWhenConsumerDisconnect-" + enableBatch;
+        final int count = 10;
+
+        @Cleanup
+        Producer<Integer> producer = null;
+        if (enableBatch) {
+            producer = pulsarClient
+                    .newProducer(Schema.INT32)
+                    .topic(topicName)
+                    .enableBatching(true)
+                    .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                    .batchingMaxMessages(count).create();
+        } else {
+            producer = pulsarClient
+                    .newProducer(Schema.INT32)
+                    .topic(topicName)
+                    .enableBatching(false).create();
+        }
+
+        @Cleanup
+        Consumer<Integer> consumer = pulsarClient
+                .newConsumer(Schema.INT32)
+                .topic(topicName)
+                .isAckReceiptEnabled(true)
+                .subscriptionName("test")
+                .subscriptionType(SubscriptionType.Shared)
+                .enableBatchIndexAcknowledgment(true)
+                .subscribe();
+
+        for (int i = 0; i < count; i++) {
+            producer.sendAsync(i);
+        }
+
+        Transaction txn1 = getTxn();
+
+        Transaction txn2 = getTxn();
+
+
+        // txn1 ack half of messages and don't end the txn1
+        for (int i = 0; i < count / 2; i++) {
+            consumer.acknowledgeAsync(consumer.receive().getMessageId(), 
txn1).get();
+        }
+
+        // txn2 ack the rest half of messages and commit tnx2
+        for (int i = count / 2; i < count; i++) {
+            consumer.acknowledgeAsync(consumer.receive().getMessageId(), 
txn2).get();
+        }
+        // commit txn2
+        txn2.commit().get();
+
+        // close and re-create consumer
+        consumer.close();
+        consumer = pulsarClient
+                .newConsumer(Schema.INT32)
+                .topic(topicName)
+                .isAckReceiptEnabled(true)
+                .subscriptionName("test")
+                .subscriptionType(SubscriptionType.Shared)
+                .enableBatchIndexAcknowledgment(true)
+                .subscribe();
+
+        Message<Integer> message = consumer.receive(3, TimeUnit.SECONDS);
+        Assert.assertNull(message);
+
+        // abort txn1
+        txn1.abort().get();
+        // after txn1 aborted, consumer will receive messages txn1 contains
+        int receiveCounter = 0;
+        while((message = consumer.receive(3, TimeUnit.SECONDS)) != null) {
+            Assert.assertEquals(message.getValue().intValue(), receiveCounter);
+            receiveCounter ++;
+        }
+        Assert.assertEquals(receiveCounter, count / 2);
+    }
+
     @Test(dataProvider="enableBatch")
     private void produceCommitTest(boolean enableBatch) throws Exception {
         @Cleanup
diff --git 
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
 
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
index d278a187690..06b9c0e555b 100644
--- 
a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
+++ 
b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java
@@ -34,8 +34,6 @@ import 
org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
 /**
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/BitSetRecyclableRecyclableTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/BitSetRecyclableRecyclableTest.java
index 0f42f35608a..b1a94c49162 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/BitSetRecyclableRecyclableTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/BitSetRecyclableRecyclableTest.java
@@ -45,4 +45,22 @@ public class BitSetRecyclableRecyclableTest {
         Assert.assertTrue(bitset1.get(128));
         Assert.assertFalse(bitset1.get(256));
     }
+
+    @Test
+    public void testBitSetEmpty() {
+        BitSetRecyclable bitSet = BitSetRecyclable.create();
+        bitSet.set(0, 5);
+        bitSet.clear(1);
+        bitSet.clear(2);
+        bitSet.clear(3);
+        long[] array = bitSet.toLongArray();
+        Assert.assertFalse(bitSet.isEmpty());
+        
Assert.assertFalse(BitSetRecyclable.create().resetWords(array).isEmpty());
+        bitSet.clear(0);
+        bitSet.clear(4);
+        Assert.assertTrue(bitSet.isEmpty());
+        long[] array1 = bitSet.toLongArray();
+        
Assert.assertTrue(BitSetRecyclable.create().resetWords(array1).isEmpty());
+        bitSet.recycle();
+    }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
index 38d85335bcd..7b73780caac 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.functions.instance;
 
-import io.netty.util.concurrent.DefaultThreadFactory;
 import lombok.Getter;
 
 import java.util.concurrent.Executors;
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
index d15d2ba90c5..5dbb61456e4 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/ClusterServiceCoordinator.java
@@ -31,9 +31,6 @@ import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.util.ExecutorProvider;
-
-import java.util.HashMap;
-import java.util.Map;
 import java.util.function.Supplier;
 
 @Slf4j
diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
deleted file mode 100644
index e420d7a5cef..00000000000
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.elasticsearch.client.elastic;
-
-import co.elastic.clients.elasticsearch.ElasticsearchClient;
-import co.elastic.clients.elasticsearch._types.ElasticsearchException;
-import co.elastic.clients.elasticsearch._types.Result;
-import co.elastic.clients.elasticsearch.core.DeleteRequest;
-import co.elastic.clients.elasticsearch.core.DeleteResponse;
-import co.elastic.clients.elasticsearch.core.IndexRequest;
-import co.elastic.clients.elasticsearch.core.IndexResponse;
-import co.elastic.clients.elasticsearch.core.SearchRequest;
-import co.elastic.clients.elasticsearch.core.SearchResponse;
-import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
-import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
-import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
-import co.elastic.clients.elasticsearch.indices.ExistsRequest;
-import co.elastic.clients.elasticsearch.indices.IndexSettings;
-import co.elastic.clients.elasticsearch.indices.RefreshRequest;
-import co.elastic.clients.json.jackson.JacksonJsonpMapper;
-import co.elastic.clients.transport.ElasticsearchTransport;
-import co.elastic.clients.transport.rest_client.RestClientTransport;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Objects;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.http.HttpHost;
-import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
-import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
-import org.apache.pulsar.io.elasticsearch.client.RestClient;
-import org.elasticsearch.client.Node;
-import org.elasticsearch.client.RestClientBuilder;
-import org.opensearch.action.bulk.BulkProcessor;
-
-@Slf4j
-public class ElasticSearchJavaRestClient extends RestClient {
-
-    private final ElasticsearchClient client;
-    private final ObjectMapper objectMapper = new ObjectMapper()
-            .configure(SerializationFeature.INDENT_OUTPUT, false)
-            .setSerializationInclusion(JsonInclude.Include.ALWAYS);
-    private BulkProcessor bulkProcessor;
-    private ElasticsearchTransport transport;
-
-    @VisibleForTesting
-    public void setBulkProcessor(BulkProcessor bulkProcessor) {
-        this.bulkProcessor = bulkProcessor;
-    }
-
-    @VisibleForTesting
-    public void setTransport(ElasticsearchTransport transport) {
-        this.transport = transport;
-    }
-
-    public ElasticSearchJavaRestClient(ElasticSearchConfig elasticSearchConfig,
-                                       BulkProcessor.Listener 
bulkProcessorListener) {
-        super(elasticSearchConfig, bulkProcessorListener);
-
-        log.info("ElasticSearch URL {}", config.getElasticSearchUrl());
-        final HttpHost[] httpHosts = getHttpHosts();
-
-        RestClientBuilder builder = 
org.elasticsearch.client.RestClient.builder(httpHosts)
-                .setRequestConfigCallback(builder1 -> builder1
-                        
.setContentCompressionEnabled(config.isCompressionEnabled())
-                        
.setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs())
-                        .setConnectTimeout(config.getConnectTimeoutInMs())
-                        .setSocketTimeout(config.getSocketTimeoutInMs()))
-                .setHttpClientConfigCallback(this.configCallback)
-                .setFailureListener(new 
org.elasticsearch.client.RestClient.FailureListener() {
-                    public void onFailure(Node node) {
-                        log.warn("Node host={} failed", node.getHost());
-                    }
-                });
-        transport = new RestClientTransport(builder.build(), new 
JacksonJsonpMapper(objectMapper));
-        client = new ElasticsearchClient(transport);
-        if (elasticSearchConfig.isBulkEnabled()) {
-            bulkProcessor = new ElasticBulkProcessor(elasticSearchConfig, 
client, bulkProcessorListener);
-        } else {
-            bulkProcessor = null;
-        }
-    }
-
-    @Override
-    public boolean indexExists(String index) throws IOException {
-        final ExistsRequest request = new ExistsRequest.Builder()
-                .index(index)
-                .build();
-        return client.indices().exists(request).value();
-    }
-
-    @Override
-    public boolean createIndex(String index) throws IOException {
-        final CreateIndexRequest createIndexRequest = new 
CreateIndexRequest.Builder()
-                .index(index)
-                .settings(new IndexSettings.Builder()
-                        .numberOfShards(config.getIndexNumberOfShards() + "")
-                        .numberOfReplicas(config.getIndexNumberOfReplicas() + 
"")
-                        .build()
-                )
-                .build();
-        try {
-            final CreateIndexResponse createIndexResponse = 
client.indices().create(createIndexRequest);
-            if ((createIndexResponse.acknowledged())
-                    && createIndexResponse.shardsAcknowledged()) {
-                return true;
-            }
-            throw new IOException("Unable to create index, acknowledged: " + 
createIndexResponse.acknowledged()
-                    + " shardsAcknowledged: " + 
createIndexResponse.shardsAcknowledged());
-        } catch (ElasticsearchException ex) {
-            final String errorType = 
Objects.requireNonNull(ex.response().error().type());
-            if (errorType.contains("resource_already_exists_exception")) {
-                return false;
-            }
-            throw ex;
-        }
-    }
-
-    @Override
-    public boolean deleteIndex(String index) throws IOException {
-        return client.indices().delete(new 
DeleteIndexRequest.Builder().index(index).build()).acknowledged();
-    }
-
-    @Override
-    public boolean deleteDocument(String index, String documentId) throws 
IOException {
-        final DeleteRequest req = new
-                DeleteRequest.Builder()
-                .index(config.getIndexName())
-                .id(documentId)
-                .build();
-
-        DeleteResponse deleteResponse = client.delete(req);
-        return deleteResponse.result().equals(Result.Deleted) || 
deleteResponse.result().equals(Result.NotFound);
-    }
-
-    @Override
-    public boolean indexDocument(String index, String documentId, String 
documentSource) throws IOException {
-        final Map mapped = objectMapper.readValue(documentSource, Map.class);
-        final IndexRequest<Object> indexRequest = new IndexRequest.Builder<>()
-                .index(config.getIndexName())
-                .document(mapped)
-                .id(documentId)
-                .build();
-        final IndexResponse indexResponse = client.index(indexRequest);
-
-        return indexResponse.result().equals(Result.Created) || 
indexResponse.result().equals(Result.Updated);
-    }
-
-    public SearchResponse<Map> search(String indexName) throws IOException {
-        return search(indexName, "*:*");
-    }
-
-    @VisibleForTesting
-    public SearchResponse<Map> search(String indexName, String query) throws 
IOException {
-        final RefreshRequest refreshRequest = new 
RefreshRequest.Builder().index(indexName).build();
-        client.indices().refresh(refreshRequest);
-
-        query = query.replace("/", "\\/");
-        return client.search(new SearchRequest.Builder().index(indexName)
-                .q(query)
-                .build(), Map.class);
-    }
-
-    @Override
-    public long totalHits(String indexName) throws IOException {
-        return totalHits(indexName, "*:*");
-    }
-
-    @Override
-    public long totalHits(String indexName, String query) throws IOException {
-        final SearchResponse<Map> searchResponse = search(indexName, query);
-        return searchResponse.hits().total().value();
-    }
-
-    @Override
-    public BulkProcessor getBulkProcessor() {
-        if (bulkProcessor == null) {
-            throw new IllegalStateException("bulkProcessor not enabled");
-        }
-        return bulkProcessor;
-    }
-
-    @Override
-    public void closeClient() {
-        if (bulkProcessor != null) {
-            bulkProcessor.close();
-        }
-        // client doesn't need to be closed, only the transport instance
-        try {
-            transport.close();
-        } catch (IOException e) {
-            log.warn("error while closing the client", e);
-        }
-    }
-
-    @VisibleForTesting
-    public ElasticsearchClient getClient() {
-        return client;
-    }
-
-    @VisibleForTesting
-    public ElasticsearchTransport getTransport() {
-        return transport;
-    }
-}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
deleted file mode 100644
index 65b38c677bf..00000000000
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.io.sinks;
-
-import java.util.Optional;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.testcontainers.elasticsearch.ElasticsearchContainer;
-
-public class ElasticSearch7SinkTester extends ElasticSearchSinkTester {
-
-    public static final String ELASTICSEARCH_7 = 
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V7"))
-            .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7");
-
-
-    public ElasticSearch7SinkTester(boolean schemaEnable) {
-        super(schemaEnable);
-    }
-
-    @Override
-    protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
-        return new ElasticsearchContainer(ELASTICSEARCH_7)
-                .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m");
-    }
-
-}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
deleted file mode 100644
index bb52c4ff03f..00000000000
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.io.sinks;
-
-import java.util.Optional;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.testcontainers.elasticsearch.ElasticsearchContainer;
-
-public class ElasticSearch8SinkTester extends ElasticSearchSinkTester {
-
-    public static final String ELASTICSEARCH_8 = 
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8"))
-            .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1");
-
-
-    public ElasticSearch8SinkTester(boolean schemaEnable) {
-        super(schemaEnable);
-    }
-
-    @Override
-    protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
-        return new ElasticsearchContainer(ELASTICSEARCH_8)
-                .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
-                .withEnv("xpack.security.enabled", "false")
-                .withEnv("xpack.security.http.ssl.enabled", "false");
-    }
-
-}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
deleted file mode 100644
index 1e10cc4189c..00000000000
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.io.sinks;
-
-import java.util.Optional;
-import org.apache.http.HttpHost;
-import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.awaitility.Awaitility;
-import org.opensearch.action.search.SearchRequest;
-import org.opensearch.action.search.SearchResponse;
-import org.opensearch.client.RequestOptions;
-import org.opensearch.client.RestClient;
-import org.opensearch.client.RestClientBuilder;
-import org.opensearch.client.RestHighLevelClient;
-import org.testcontainers.elasticsearch.ElasticsearchContainer;
-import org.testcontainers.utility.DockerImageName;
-
-import java.util.Map;
-
-import static org.testng.Assert.assertTrue;
-
-public class OpenSearchSinkTester extends ElasticSearchSinkTester {
-
-    public static final String OPENSEARCH = 
Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE"))
-            .orElse("opensearchproject/opensearch:1.2.4");
-
-    private RestHighLevelClient elasticClient;
-
-
-    public OpenSearchSinkTester(boolean schemaEnable) {
-        super(schemaEnable);
-    }
-
-    @Override
-    protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
-        DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH)
-                
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
-        return new ElasticsearchContainer(dockerImageName)
-                .withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m")
-                .withEnv("bootstrap.memory_lock", "true")
-                .withEnv("plugins.security.disabled", "true");
-    }
-
-    @Override
-    public void prepareSink() throws Exception {
-        RestClientBuilder builder = RestClient.builder(
-                new HttpHost(
-                        "localhost",
-                        serviceContainer.getMappedPort(9200),
-                        "http"));
-        elasticClient = new RestHighLevelClient(builder);
-    }
-
-    @Override
-    public void validateSinkResult(Map<String, String> kvs) {
-        org.opensearch.action.search.SearchRequest searchRequest = new 
SearchRequest("test-index");
-
-        Awaitility.await().untilAsserted(() -> {
-            SearchResponse searchResult = elasticClient.search(searchRequest, 
RequestOptions.DEFAULT);
-            assertTrue(searchResult.getHits().getTotalHits().value > 0, 
searchResult.toString());
-        });
-    }
-
-    @Override
-    public void close() throws Exception {
-        super.close();
-        if (elasticClient != null) {
-            elasticClient.close();
-            elasticClient = null;
-        }
-    }
-}


Reply via email to