Added mocked unit tests for base query and partition-based query Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/9f4d15e4 Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/9f4d15e4 Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/9f4d15e4
Branch: refs/heads/master Commit: 9f4d15e4df90d8555bbb5e0e17aaa3a3b924d3df Parents: 26641cc Author: Kasper Sørensen <[email protected]> Authored: Sun Jan 28 15:27:08 2018 -0800 Committer: Kasper Sørensen <[email protected]> Committed: Sun Jan 28 15:31:11 2018 -0800 ---------------------------------------------------------------------- kafka/pom.xml | 5 + .../metamodel/kafka/KafkaDataContext.java | 6 + .../metamodel/kafka/KafkaDataContextTest.java | 154 +++++++++++++++++++ 3 files changed, 165 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metamodel/blob/9f4d15e4/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/kafka/pom.xml b/kafka/pom.xml index 52d557a..d60adbd 100644 --- a/kafka/pom.xml +++ b/kafka/pom.xml @@ -43,5 +43,10 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymock</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/metamodel/blob/9f4d15e4/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java index eda881c..2fa8c06 100644 --- a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java +++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java @@ -203,6 +203,12 @@ public class KafkaDataContext<K, V> extends QueryPostprocessDataContext { } private static List<Integer> toIntList(Object operand) { + if (operand == null) { + return null; + } + if (operand.getClass().isArray()) { + operand = Arrays.asList((Object[]) operand); + } final List<Integer> list = new ArrayList<>(); if (operand instanceof Iterable) { ((Iterable<?>) operand).forEach(o -> { http://git-wip-us.apache.org/repos/asf/metamodel/blob/9f4d15e4/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java b/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java new file mode 100644 index 0000000..4b19940 --- /dev/null +++ b/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java @@ -0,0 +1,154 @@ +package org.apache.metamodel.kafka; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.metamodel.DataContext; +import org.apache.metamodel.data.DataSet; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.EasyMockSupport; +import org.junit.Assert; +import org.junit.Test; + +public class KafkaDataContextTest extends EasyMockSupport { + + @Test + public void testGetSchemaInfo() { + final ConsumerFactory consumerFactory = createMock(ConsumerFactory.class); + + replayAll(); + + final Supplier<Collection<String>> topicSupplier = () -> Arrays.asList("foo", "bar"); + final DataContext dc = new KafkaDataContext<>(String.class, String.class, consumerFactory, topicSupplier); + + verifyAll(); + + Assert.assertEquals("[foo, bar]", dc.getDefaultSchema().getTableNames().toString()); + } + + @Test + public void testQueryWithoutOptimization() { + final ConsumerFactory consumerFactory = createMock(ConsumerFactory.class); + @SuppressWarnings("unchecked") + final Consumer<String, String> consumer = createMock(Consumer.class); + + EasyMock.expect(consumerFactory.createConsumer("myTopic", String.class, String.class)).andReturn(consumer); + + final List<PartitionInfo> partitionInfoList = new ArrayList<>(); + partitionInfoList.add(new PartitionInfo("myTopic", 0, null, null, null)); + partitionInfoList.add(new PartitionInfo("myTopic", 1, null, null, null)); + partitionInfoList.add(new PartitionInfo("myTopic", 2, null, null, null)); + partitionInfoList.add(new PartitionInfo("myTopic", 3, null, null, null)); + + EasyMock.expect(consumer.partitionsFor("myTopic")).andReturn(partitionInfoList); + + final Capture<Collection<TopicPartition>> assignmentCapture = new Capture<>(); + consumer.assign(EasyMock.capture(assignmentCapture)); + consumer.seekToBeginning(EasyMock.anyObject()); + + final ConsumerRecords<String, String> consumerRecords1 = createConsumerRecords(1, 0, 10); + final ConsumerRecords<String, String> consumerRecords2 = createConsumerRecords(2, 20, 10); + + EasyMock.expect(consumer.poll(1000)).andReturn(consumerRecords1); + EasyMock.expect(consumer.poll(1000)).andReturn(consumerRecords2); + EasyMock.expect(consumer.poll(1000)).andReturn(null); + + consumer.unsubscribe(); + consumer.close(); + + replayAll(); + + final Supplier<Collection<String>> topicSupplier = () -> Arrays.asList("myTopic"); + final DataContext dc = new KafkaDataContext<>(String.class, String.class, consumerFactory, topicSupplier); + + final DataSet dataSet = dc.query().from("myTopic").select("partition", "offset", "value").where("key").eq( + "key2").execute(); + Assert.assertTrue(dataSet.next()); + Assert.assertEquals("Row[values=[1, 2, value2]]", dataSet.getRow().toString()); + Assert.assertTrue(dataSet.next()); + Assert.assertEquals("Row[values=[2, 22, value2]]", dataSet.getRow().toString()); + Assert.assertFalse(dataSet.next()); + dataSet.close(); + + verifyAll(); + + // assert that we assigned exactly the three partitions we queried for + final ArrayList<TopicPartition> capturedAssignmentList = new ArrayList<>(assignmentCapture.getValue()); + Assert.assertEquals(4, capturedAssignmentList.size()); + Assert.assertEquals(0, capturedAssignmentList.get(0).partition()); + Assert.assertEquals(1, capturedAssignmentList.get(1).partition()); + Assert.assertEquals(2, capturedAssignmentList.get(2).partition()); + Assert.assertEquals(3, capturedAssignmentList.get(3).partition()); + } + + @Test + public void testQueryOptimizationByPartition() { + final ConsumerFactory consumerFactory = createMock(ConsumerFactory.class); + @SuppressWarnings("unchecked") + final Consumer<String, String> consumer = createMock(Consumer.class); + + EasyMock.expect(consumerFactory.createConsumer("myTopic", String.class, String.class)).andReturn(consumer); + + final Capture<Collection<TopicPartition>> assignmentCapture = new Capture<>(); + consumer.assign(EasyMock.capture(assignmentCapture)); + consumer.seekToBeginning(EasyMock.anyObject()); + + final ConsumerRecords<String, String> consumerRecords1 = createConsumerRecords(1, 0, 2); + final ConsumerRecords<String, String> consumerRecords2 = createConsumerRecords(2, 20, 1); + + EasyMock.expect(consumer.poll(1000)).andReturn(consumerRecords1); + EasyMock.expect(consumer.poll(1000)).andReturn(consumerRecords2); + EasyMock.expect(consumer.poll(1000)).andReturn(null); + + consumer.unsubscribe(); + consumer.close(); + + replayAll(); + + final Supplier<Collection<String>> topicSupplier = () -> Arrays.asList("myTopic"); + final DataContext dc = new KafkaDataContext<>(String.class, String.class, consumerFactory, topicSupplier); + + final DataSet dataSet = dc.query().from("myTopic").select("offset", "value").where("partition").in(1, 2, 42) + .execute(); + Assert.assertTrue(dataSet.next()); + Assert.assertEquals("Row[values=[0, value0]]", dataSet.getRow().toString()); + Assert.assertTrue(dataSet.next()); + Assert.assertEquals("Row[values=[1, value1]]", dataSet.getRow().toString()); + Assert.assertTrue(dataSet.next()); + Assert.assertEquals("Row[values=[20, value0]]", dataSet.getRow().toString()); + Assert.assertFalse(dataSet.next()); + dataSet.close(); + + verifyAll(); + + // assert that we assigned exactly the three partitions we queried for + final ArrayList<TopicPartition> capturedAssignmentList = new ArrayList<>(assignmentCapture.getValue()); + Assert.assertEquals(3, capturedAssignmentList.size()); + Assert.assertEquals(1, capturedAssignmentList.get(0).partition()); + Assert.assertEquals(2, capturedAssignmentList.get(1).partition()); + Assert.assertEquals(42, capturedAssignmentList.get(2).partition()); + } + + private ConsumerRecords<String, String> createConsumerRecords(int partition, int offset, int howMany) { + final List<ConsumerRecord<String, String>> list = new ArrayList<>(); + for (int i = 0; i < howMany; i++) { + list.add(new ConsumerRecord<String, String>("myTopic", partition, offset + i, "key" + i, "value" + i)); + } + + final Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); + records.put(new TopicPartition("myTopic", partition), list); + return new ConsumerRecords<>(records); + } + +}
