This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 7bf7c17 [Improve]Improve the unit test case of DorisWriter (#34)
7bf7c17 is described below
commit 7bf7c171a5add364622f292e70e2119fb894d686
Author: wudongliang <[email protected]>
AuthorDate: Tue Jul 2 16:26:38 2024 +0800
[Improve]Improve the unit test case of DorisWriter (#34)
---
.../kafka/connector/writer/TestCopyIntoWriter.java | 22 +++++++++-------------
.../connector/writer/TestStreamLoadWriter.java | 21 ++++++++-------------
2 files changed, 17 insertions(+), 26 deletions(-)
diff --git
a/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java
b/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java
index 302b9ea..60bcd81 100644
---
a/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java
+++
b/src/test/java/org/apache/doris/kafka/connector/writer/TestCopyIntoWriter.java
@@ -32,7 +32,6 @@ import java.util.Properties;
import org.apache.doris.kafka.connector.cfg.DorisOptions;
import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider;
-import org.apache.doris.kafka.connector.exception.CopyLoadException;
import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
import org.apache.doris.kafka.connector.writer.load.CopyLoad;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -67,22 +66,14 @@ public class TestCopyIntoWriter {
dorisOptions = new DorisOptions((Map) props);
}
- @Test(expected = CopyLoadException.class)
+ @Test
public void fetchOffset() {
- DorisConnectMonitor dorisConnectMonitor =
mock(DorisConnectMonitor.class);
- dorisWriter =
- new CopyIntoWriter(
- "test5",
- 0,
- dorisOptions,
- new JdbcConnectionProvider(dorisOptions),
- dorisConnectMonitor);
+ dorisWriter = mockCopyIntoWriter(new String[] {});
dorisWriter.fetchOffset();
Assert.assertEquals(-1l,
dorisWriter.getOffsetPersistedInDoris().longValue());
}
- @Test
- public void fetchOffsetTest() {
+ private CopyIntoWriter mockCopyIntoWriter(String[] listLoadFiles) {
DorisConnectMonitor dorisConnectMonitor =
mock(DorisConnectMonitor.class);
CopyIntoWriter copyIntoWriter =
spy(
@@ -93,7 +84,12 @@ public class TestCopyIntoWriter {
new JdbcConnectionProvider(dorisOptions),
dorisConnectMonitor));
doReturn(Arrays.asList(listLoadFiles)).when(copyIntoWriter).listLoadFiles();
- dorisWriter = copyIntoWriter;
+ return copyIntoWriter;
+ }
+
+ @Test
+ public void fetchOffsetTest() {
+ dorisWriter = mockCopyIntoWriter(listLoadFiles);
dorisWriter.fetchOffset();
System.out.println(dorisWriter.getOffsetPersistedInDoris().longValue());
Assert.assertEquals(168172036,
dorisWriter.getOffsetPersistedInDoris().longValue());
diff --git
a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
index 7e44a2d..ea54211 100644
---
a/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
+++
b/src/test/java/org/apache/doris/kafka/connector/writer/TestStreamLoadWriter.java
@@ -36,7 +36,6 @@ import java.util.Properties;
import org.apache.doris.kafka.connector.cfg.DorisOptions;
import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
import org.apache.doris.kafka.connector.connection.JdbcConnectionProvider;
-import org.apache.doris.kafka.connector.exception.StreamLoadException;
import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
import org.apache.doris.kafka.connector.writer.commit.DorisCommittable;
import org.apache.doris.kafka.connector.writer.load.DorisStreamLoad;
@@ -82,22 +81,14 @@ public class TestStreamLoadWriter {
"VISIBLE");
}
- @Test(expected = StreamLoadException.class)
+ @Test
public void fetchOffset() {
- DorisConnectMonitor dorisConnectMonitor =
mock(DorisConnectMonitor.class);
- dorisWriter =
- new StreamLoadWriter(
- "avro-complex10",
- 2,
- dorisOptions,
- new JdbcConnectionProvider(dorisOptions),
- dorisConnectMonitor);
+ dorisWriter = mockStreamLoadWriter(new HashMap<>());
dorisWriter.fetchOffset();
Assert.assertEquals(-1l,
dorisWriter.getOffsetPersistedInDoris().longValue());
}
- @Test
- public void fetchOffsetTest() {
+ private StreamLoadWriter mockStreamLoadWriter(Map<String, String>
label2Status) {
DorisConnectMonitor dorisConnectMonitor =
mock(DorisConnectMonitor.class);
StreamLoadWriter streamLoadWriter =
spy(
@@ -109,8 +100,12 @@ public class TestStreamLoadWriter {
dorisConnectMonitor));
doReturn(label2Status).when(streamLoadWriter).fetchLabel2Status();
- dorisWriter = streamLoadWriter;
+ return streamLoadWriter;
+ }
+ @Test
+ public void fetchOffsetTest() {
+ dorisWriter = mockStreamLoadWriter(label2Status);
dorisWriter.fetchOffset();
System.out.println(dorisWriter.getOffsetPersistedInDoris().longValue());
Assert.assertEquals(832,
dorisWriter.getOffsetPersistedInDoris().longValue());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]