This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8204a8ae398 [fix][test] Close the resource after the test. (#21732)
8204a8ae398 is described below
commit 8204a8ae3980eb4251f539f191debbaf300c7d9f
Author: Jiwei Guo <[email protected]>
AuthorDate: Mon Dec 18 10:20:34 2023 +0800
[fix][test] Close the resource after the test. (#21732)
---
.../pulsar/io/elasticsearch/ElasticSearchExtractTests.java | 13 +++++++++++++
.../pulsar/io/rabbitmq/source/RabbitMQSourceTest.java | 3 ++-
2 files changed, 15 insertions(+), 1 deletion(-)
diff --git
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchExtractTests.java
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchExtractTests.java
index cbc3de908c6..f5a2e36aef4 100644
---
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchExtractTests.java
+++
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchExtractTests.java
@@ -101,6 +101,7 @@ public class ElasticSearchExtractTests {
Pair<String, String> pair =
elasticSearchSink.extractIdAndDocument(genericObjectRecord);
assertEquals(pair.getLeft(), "1");
assertEquals(pair.getRight(),
"{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0,\"i\":1,\"l\":10}}");
+ elasticSearchSink.close();
// two fields PK
ElasticSearchSink elasticSearchSink2 = new ElasticSearchSink();
@@ -113,6 +114,7 @@ public class ElasticSearchExtractTests {
Pair<String, String> pair2 =
elasticSearchSink2.extractIdAndDocument(genericObjectRecord);
assertEquals(pair2.getLeft(), "[\"1\",1]");
assertEquals(pair2.getRight(),
"{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0,\"i\":1,\"l\":10}}");
+ elasticSearchSink2.close();
// default config with null PK => indexed with auto generated _id
ElasticSearchSink elasticSearchSink3 = new ElasticSearchSink();
@@ -122,6 +124,7 @@ public class ElasticSearchExtractTests {
Pair<String, String> pair3 =
elasticSearchSink3.extractIdAndDocument(genericObjectRecord);
assertNull(pair3.getLeft());
assertEquals(pair3.getRight(),
"{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0,\"i\":1,\"l\":10}}");
+ elasticSearchSink3.close();
// default config with null PK + null value
ElasticSearchSink elasticSearchSink4 = new ElasticSearchSink();
@@ -146,6 +149,7 @@ public class ElasticSearchExtractTests {
});
assertNull(pair4.getLeft());
assertNull(pair4.getRight());
+ elasticSearchSink4.close();
}
@Test(dataProvider = "schemaType")
@@ -225,6 +229,7 @@ public class ElasticSearchExtractTests {
Pair<String, String> pair =
elasticSearchSink.extractIdAndDocument(genericObjectRecord);
assertEquals(pair.getLeft(), "[\"1\",1]");
assertEquals(pair.getRight(),
"{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0,\"i\":1,\"l\":10}}");
+ elasticSearchSink.close();
elasticSearchSink = new ElasticSearchSink();
elasticSearchSink.open(ImmutableMap.of(
@@ -236,6 +241,7 @@ public class ElasticSearchExtractTests {
pair = elasticSearchSink.extractIdAndDocument(genericObjectRecord);
assertEquals(pair.getLeft(), "[\"1\",1]");
assertEquals(pair.getRight(),
"{\"a\":\"1\",\"b\":1,\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0,\"i\":1,\"l\":10}}");
+ elasticSearchSink.close();
elasticSearchSink = new ElasticSearchSink();
elasticSearchSink.open(ImmutableMap.of(
@@ -246,6 +252,7 @@ public class ElasticSearchExtractTests {
pair = elasticSearchSink.extractIdAndDocument(genericObjectRecord);
assertNull(pair.getLeft());
assertEquals(pair.getRight(),
"{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0,\"i\":1,\"l\":10}}");
+ elasticSearchSink.close();
elasticSearchSink = new ElasticSearchSink();
elasticSearchSink.open(ImmutableMap.of("elasticSearchUrl",
"http://localhost:9200",
@@ -255,6 +262,7 @@ public class ElasticSearchExtractTests {
pair = elasticSearchSink.extractIdAndDocument(genericObjectRecord);
assertNull(pair.getLeft());
assertEquals(pair.getRight(),
"{\"a\":\"1\",\"b\":1,\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0,\"i\":1,\"l\":10}}");
+ elasticSearchSink.close();
// test null value
elasticSearchSink = new ElasticSearchSink();
@@ -291,6 +299,7 @@ public class ElasticSearchExtractTests {
});
assertEquals(pair.getLeft(), "[\"1\",1]");
assertNull(pair.getRight());
+ elasticSearchSink.close();
}
@Test(dataProvider = "schemaType")
@@ -326,6 +335,7 @@ public class ElasticSearchExtractTests {
"keyIgnore", "false"), null);
Pair<String, String> pair =
elasticSearchSink.extractIdAndDocument(genericObjectRecord);
assertEquals(pair.getKey(),
"{\"b_inside_inner\":\"0b_value_from_inner\",\"a_inside_inner\":\"a_value_from_inner\"}");
+ elasticSearchSink.close();
elasticSearchSink = new ElasticSearchSink();
elasticSearchSink.open(ImmutableMap.of(
@@ -336,6 +346,7 @@ public class ElasticSearchExtractTests {
"keyIgnore", "false"), null);
pair = elasticSearchSink.extractIdAndDocument(genericObjectRecord);
assertEquals(pair.getKey(),
"{\"a_inside_inner\":\"a_value_from_inner\",\"b_inside_inner\":\"0b_value_from_inner\"}");
+ elasticSearchSink.close();
}
@@ -378,6 +389,7 @@ public class ElasticSearchExtractTests {
"keyIgnore", "false"), null);
Pair<String, String> pair =
elasticSearchSink.extractIdAndDocument(genericObjectRecord);
assertEquals(pair.getKey(),
"[\"a_key\",\"0b_key\",\"c_key\",{\"b_inside_inner\":\"0b_value_from_inner\",\"a_inside_inner\":\"a_value_from_inner\"}]");
+ elasticSearchSink.close();
elasticSearchSink = new ElasticSearchSink();
elasticSearchSink.open(ImmutableMap.of(
@@ -388,6 +400,7 @@ public class ElasticSearchExtractTests {
"keyIgnore", "false"), null);
pair = elasticSearchSink.extractIdAndDocument(genericObjectRecord);
assertEquals(pair.getKey(),
"[\"a_key\",\"0b_key\",\"c_key\",{\"a_inside_inner\":\"a_value_from_inner\",\"b_inside_inner\":\"0b_value_from_inner\"}]");
+ elasticSearchSink.close();
}
private Record<GenericObject> getKeyValueGenericObject(SchemaType
schemaType, GenericSchema<GenericRecord> keySchema, GenericRecord
keyGenericRecord) {
diff --git
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java
index abff93a3632..2771185b841 100644
---
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java
+++
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java
@@ -44,7 +44,7 @@ public class RabbitMQSourceTest {
}
@Test
- public void TestOpenAndWriteSink() {
+ public void TestOpenAndWriteSink() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put("host", "localhost");
configs.put("port", "5672");
@@ -67,6 +67,7 @@ public class RabbitMQSourceTest {
// open should success
// rabbitmq service may need time to initialize
Awaitility.await().ignoreExceptions().untilAsserted(() ->
source.open(configs, null));
+ source.close();
}
}