This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.0.0 in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 439f14285911702e3495e1dc61998fe86b8b6a61 Author: 卢春亮 <[email protected]> AuthorDate: Thu Feb 10 17:01:30 2022 +0800 [INLONG-2435][Sort] Fix Sort-standalone UT Exceptions (#2436) --- .../TestDefaultEvent2IndexRequestHandler.java | 7 +- .../sink/elasticsearch/TestEsCallbackListener.java | 117 ++++++++------------- .../sink/elasticsearch/TestEsChannelWorker.java | 35 +++--- .../sink/elasticsearch/TestEsOutputChannel.java | 30 +++--- .../sink/elasticsearch/TestEsSinkContext.java | 14 ++- 5 files changed, 94 insertions(+), 109 deletions(-) diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java index 5177bf8..725b8f1 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java +++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestDefaultEvent2IndexRequestHandler.java @@ -21,10 +21,12 @@ import static org.junit.Assert.assertEquals; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.inlong.commons.config.metrics.MetricRegister; import org.apache.inlong.sort.standalone.channel.ProfileEvent; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; /** @@ -33,13 +35,16 @@ import org.powermock.modules.junit4.PowerMockRunner; */ @RunWith(PowerMockRunner.class) @PowerMockIgnore("javax.management.*") +@PrepareForTest({MetricRegister.class}) public class TestDefaultEvent2IndexRequestHandler { /** * test that ProfileEvent transform to EsIndexRequest + * + * @throws Exception */ @Test - public void test() { + public void test() throws Exception { LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>(); EsSinkContext context = TestEsSinkContext.mock(dispatchQueue); ProfileEvent event = TestEsSinkContext.mockProfileEvent(); diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java index 84441ef..da2bcb9 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java +++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsCallbackListener.java @@ -19,6 +19,7 @@ package org.apache.inlong.sort.standalone.sink.elasticsearch; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.inlong.commons.config.metrics.MetricRegister; import org.apache.inlong.sort.standalone.channel.ProfileEvent; import org.elasticsearch.action.DocWriteRequest.OpType; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -38,13 +39,13 @@ import org.powermock.modules.junit4.PowerMockRunner; */ @RunWith(PowerMockRunner.class) @PowerMockIgnore("javax.management.*") -@PrepareForTest({EsSinkFactory.class}) +@PrepareForTest({EsSinkFactory.class, MetricRegister.class}) public class TestEsCallbackListener { private EsSinkContext context; @Before - public void before() { + public void before() throws Exception { LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>(); this.context = TestEsSinkContext.mock(dispatchQueue); } @@ -54,84 +55,54 @@ public class TestEsCallbackListener { */ @Test public void testBeforeBulk() { - try { - // prepare - ProfileEvent event = TestEsSinkContext.mockProfileEvent(); - EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event); - long executionId = 0; - BulkRequest bulkRequest = new BulkRequest(); - bulkRequest.add(indexRequest); - // test - EsCallbackListener listener = new EsCallbackListener(context); - listener.beforeBulk(executionId, bulkRequest); - } catch (Exception e) { - e.printStackTrace(); - } + // prepare + ProfileEvent event = TestEsSinkContext.mockProfileEvent(); + EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event); + long executionId = 0; + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(indexRequest); + // test + EsCallbackListener listener = new EsCallbackListener(context); + listener.beforeBulk(executionId, bulkRequest); } /** * testAfterSuccessBulk + * + * @throws Exception */ @Test - public void testAfterSuccessBulk() { - try { - // prepare - LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>(); - EsSinkContext context = TestEsSinkContext.mock(dispatchQueue); - ProfileEvent event = TestEsSinkContext.mockProfileEvent(); - EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event); - long executionId = 0; - EsCallbackListener listener = new EsCallbackListener(context); - // request - BulkRequest bulkRequest = new BulkRequest(); - bulkRequest.add(indexRequest); - // success response - IndexResponse indexResponse = new IndexResponse(); - BulkItemResponse itemResponse = new BulkItemResponse(0, OpType.INDEX, indexResponse); - BulkItemResponse[] responses = new BulkItemResponse[1]; - responses[0] = itemResponse; - BulkResponse bulkResponse = new BulkResponse(responses, 0); - listener.afterBulk(executionId, bulkRequest, bulkResponse); - - // fail resend - BulkItemResponse.Failure failure = new BulkItemResponse.Failure("index", OpType.INDEX.name(), "id", - new Exception()); - BulkItemResponse failResponse = new BulkItemResponse(0, OpType.INDEX, failure); - responses[0] = failResponse; - listener.afterBulk(executionId, bulkRequest, bulkResponse); - - // failNull noResend - BulkItemResponse.Failure failureNull = new BulkItemResponse.Failure("index", OpType.INDEX.name(), "id", - null); - BulkItemResponse failNullResponse = new BulkItemResponse(0, OpType.INDEX, failureNull); - responses[0] = failNullResponse; - listener.afterBulk(executionId, bulkRequest, bulkResponse); - } catch (Exception e) { - e.printStackTrace(); - } - } + public void testAfterSuccessBulk() throws Exception { + // prepare + LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>(); + EsSinkContext context = TestEsSinkContext.mock(dispatchQueue); + ProfileEvent event = TestEsSinkContext.mockProfileEvent(); + EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event); + long executionId = 0; + EsCallbackListener listener = new EsCallbackListener(context); + // request + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(indexRequest); + // success response + IndexResponse indexResponse = new IndexResponse(); + BulkItemResponse itemResponse = new BulkItemResponse(0, OpType.INDEX, indexResponse); + BulkItemResponse[] responses = new BulkItemResponse[1]; + responses[0] = itemResponse; + BulkResponse bulkResponse = new BulkResponse(responses, 0); + listener.afterBulk(executionId, bulkRequest, bulkResponse); - /** - * testAfterFailureBulk - */ - @Test - public void testAfterFailureBulk() { - try { - // prepare - LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>(); - EsSinkContext context = TestEsSinkContext.mock(dispatchQueue); - ProfileEvent event = TestEsSinkContext.mockProfileEvent(); - EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event); - long executionId = 0; - EsCallbackListener listener = new EsCallbackListener(context); - // request - BulkRequest bulkRequest = new BulkRequest(); - bulkRequest.add(indexRequest); + // fail resend + BulkItemResponse.Failure failure = new BulkItemResponse.Failure("index", OpType.INDEX.name(), "id", + new Exception()); + BulkItemResponse failResponse = new BulkItemResponse(0, OpType.INDEX, failure); + responses[0] = failResponse; + listener.afterBulk(executionId, bulkRequest, bulkResponse); - // fail resend - listener.afterBulk(executionId, bulkRequest, new Exception()); - } catch (Exception e) { - e.printStackTrace(); - } + // failNull noResend + BulkItemResponse.Failure failureNull = new BulkItemResponse.Failure("index", OpType.INDEX.name(), "id", + null); + BulkItemResponse failNullResponse = new BulkItemResponse(0, OpType.INDEX, failureNull); + responses[0] = failNullResponse; + listener.afterBulk(executionId, bulkRequest, bulkResponse); } } diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java index 5438bb5..c95b6b4 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java +++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsChannelWorker.java @@ -20,6 +20,7 @@ package org.apache.inlong.sort.standalone.sink.elasticsearch; import java.util.concurrent.LinkedBlockingQueue; import org.apache.flume.Transaction; +import org.apache.inlong.commons.config.metrics.MetricRegister; import org.apache.inlong.sort.standalone.channel.ProfileEvent; import org.junit.Before; import org.junit.Test; @@ -34,16 +35,18 @@ import org.powermock.modules.junit4.PowerMockRunner; */ @RunWith(PowerMockRunner.class) @PowerMockIgnore("javax.management.*") -@PrepareForTest({EsSinkFactory.class}) +@PrepareForTest({EsSinkFactory.class, MetricRegister.class}) public class TestEsChannelWorker { private EsSinkContext context; /** * before + * + * @throws Exception */ @Before - public void before() { + public void before() throws Exception { LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>(); this.context = TestEsSinkContext.mock(dispatchQueue); } @@ -53,21 +56,17 @@ public class TestEsChannelWorker { */ @Test public void test() { - try { - // prepare - ProfileEvent event = TestEsSinkContext.mockProfileEvent(); - Transaction tx = this.context.getChannel().getTransaction(); - tx.begin(); - this.context.getChannel().put(event); - tx.commit(); - tx.close(); - // test - EsChannelWorker worker = new EsChannelWorker(context, 0); - worker.doRun(); - worker.start(); - worker.close(); - } catch (Exception e) { - e.printStackTrace(); - } + // prepare + ProfileEvent event = TestEsSinkContext.mockProfileEvent(); + Transaction tx = this.context.getChannel().getTransaction(); + tx.begin(); + this.context.getChannel().put(event); + tx.commit(); + tx.close(); + // test + EsChannelWorker worker = new EsChannelWorker(context, 0); + worker.doRun(); + worker.start(); + worker.close(); } } diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java index cbee385..c1facdf 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java +++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsOutputChannel.java @@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.any; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.inlong.commons.config.metrics.MetricRegister; import org.apache.inlong.sort.standalone.channel.ProfileEvent; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; @@ -47,7 +48,8 @@ import org.powermock.modules.junit4.PowerMockRunner; */ @RunWith(PowerMockRunner.class) @PowerMockIgnore("javax.management.*") -@PrepareForTest({EsSinkFactory.class, RestHighLevelClient.class, ClusterClient.class, IndicesClient.class}) +@PrepareForTest({EsSinkFactory.class, RestHighLevelClient.class, ClusterClient.class, IndicesClient.class, + MetricRegister.class}) public class TestEsOutputChannel { private RestHighLevelClient esClient; @@ -103,22 +105,20 @@ public class TestEsOutputChannel { /** * test + * + * @throws Exception */ @Test - public void test() { - try { - LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>(); - EsSinkContext context = TestEsSinkContext.mock(dispatchQueue); - EsOutputChannel output = new EsOutputChannel(context); - ProfileEvent event = TestEsSinkContext.mockProfileEvent(); - EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event); - dispatchQueue.add(indexRequest); - output.init(); - output.send(); - output.close(); - } catch (Exception e) { - e.printStackTrace(); - } + public void test() throws Exception { + LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>(); + EsSinkContext context = TestEsSinkContext.mock(dispatchQueue); + EsOutputChannel output = new EsOutputChannel(context); + ProfileEvent event = TestEsSinkContext.mockProfileEvent(); + EsIndexRequest indexRequest = context.getIndexRequestHandler().parse(context, event); + dispatchQueue.add(indexRequest); + output.init(); + output.send(); + output.close(); } } diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java index 5733c03..55f97e8 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/elasticsearch/TestEsSinkContext.java @@ -18,6 +18,7 @@ package org.apache.inlong.sort.standalone.sink.elasticsearch; import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; import java.nio.charset.Charset; import java.util.HashMap; @@ -26,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue; import org.apache.flume.Channel; import org.apache.flume.Context; +import org.apache.inlong.commons.config.metrics.MetricRegister; import org.apache.inlong.sort.standalone.channel.BufferQueueChannel; import org.apache.inlong.sort.standalone.channel.ProfileEvent; import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; @@ -33,7 +35,9 @@ import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig; import org.apache.inlong.sort.standalone.utils.Constants; import org.junit.Test; import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; /** @@ -42,6 +46,7 @@ import org.powermock.modules.junit4.PowerMockRunner; */ @RunWith(PowerMockRunner.class) @PowerMockIgnore("javax.management.*") +@PrepareForTest({MetricRegister.class}) public class TestEsSinkContext { public static final String TEST_INLONG_GROUP_ID = "0fc00000046"; @@ -53,8 +58,11 @@ public class TestEsSinkContext { * * @param dispatchQueue * @return + * @throws Exception */ - public static EsSinkContext mock(LinkedBlockingQueue<EsIndexRequest> dispatchQueue) { + public static EsSinkContext mock(LinkedBlockingQueue<EsIndexRequest> dispatchQueue) throws Exception { + PowerMockito.mockStatic(MetricRegister.class); + PowerMockito.doNothing().when(MetricRegister.class, "register", any()); Context context = CommonPropertiesHolder.getContext(); String sinkName = CommonPropertiesHolder.getClusterId() + "Sink"; context.put(SortTaskConfig.KEY_TASK_NAME, "sid_es_es-rmrv7g7a_v3"); @@ -93,9 +101,11 @@ public class TestEsSinkContext { /** * test + * + * @throws Exception */ @Test - public void test() { + public void test() throws Exception { LinkedBlockingQueue<EsIndexRequest> dispatchQueue = new LinkedBlockingQueue<>(); EsSinkContext context = mock(dispatchQueue); assertEquals(10, context.getBulkSizeMb());
