Added mocked unit tests for base query and partition-based query

Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/9f4d15e4
Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/9f4d15e4
Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/9f4d15e4

Branch: refs/heads/master
Commit: 9f4d15e4df90d8555bbb5e0e17aaa3a3b924d3df
Parents: 26641cc
Author: Kasper Sørensen <[email protected]>
Authored: Sun Jan 28 15:27:08 2018 -0800
Committer: Kasper Sørensen <[email protected]>
Committed: Sun Jan 28 15:31:11 2018 -0800

----------------------------------------------------------------------
 kafka/pom.xml                                   |   5 +
 .../metamodel/kafka/KafkaDataContext.java       |   6 +
 .../metamodel/kafka/KafkaDataContextTest.java   | 154 +++++++++++++++++++
 3 files changed, 165 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metamodel/blob/9f4d15e4/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/kafka/pom.xml b/kafka/pom.xml
index 52d557a..d60adbd 100644
--- a/kafka/pom.xml
+++ b/kafka/pom.xml
@@ -43,5 +43,10 @@
                        <artifactId>junit</artifactId>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.easymock</groupId>
+                       <artifactId>easymock</artifactId>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/metamodel/blob/9f4d15e4/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java 
b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
index eda881c..2fa8c06 100644
--- a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
@@ -203,6 +203,12 @@ public class KafkaDataContext<K, V> extends 
QueryPostprocessDataContext {
     }
 
     private static List<Integer> toIntList(Object operand) {
+        if (operand == null) {
+            return null;
+        }
+        if (operand.getClass().isArray()) {
+            operand = Arrays.asList((Object[]) operand);
+        }
         final List<Integer> list = new ArrayList<>();
         if (operand instanceof Iterable) {
             ((Iterable<?>) operand).forEach(o -> {

http://git-wip-us.apache.org/repos/asf/metamodel/blob/9f4d15e4/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java 
b/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java
new file mode 100644
index 0000000..4b19940
--- /dev/null
+++ b/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java
@@ -0,0 +1,154 @@
+package org.apache.metamodel.kafka;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.metamodel.DataContext;
+import org.apache.metamodel.data.DataSet;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class KafkaDataContextTest extends EasyMockSupport {
+
+    @Test
+    public void testGetSchemaInfo() {
+        final ConsumerFactory consumerFactory = 
createMock(ConsumerFactory.class);
+
+        replayAll();
+
+        final Supplier<Collection<String>> topicSupplier = () -> 
Arrays.asList("foo", "bar");
+        final DataContext dc = new KafkaDataContext<>(String.class, 
String.class, consumerFactory, topicSupplier);
+
+        verifyAll();
+
+        Assert.assertEquals("[foo, bar]", 
dc.getDefaultSchema().getTableNames().toString());
+    }
+
+    @Test
+    public void testQueryWithoutOptimization() {
+        final ConsumerFactory consumerFactory = 
createMock(ConsumerFactory.class);
+        @SuppressWarnings("unchecked")
+        final Consumer<String, String> consumer = createMock(Consumer.class);
+
+        EasyMock.expect(consumerFactory.createConsumer("myTopic", 
String.class, String.class)).andReturn(consumer);
+
+        final List<PartitionInfo> partitionInfoList = new ArrayList<>();
+        partitionInfoList.add(new PartitionInfo("myTopic", 0, null, null, 
null));
+        partitionInfoList.add(new PartitionInfo("myTopic", 1, null, null, 
null));
+        partitionInfoList.add(new PartitionInfo("myTopic", 2, null, null, 
null));
+        partitionInfoList.add(new PartitionInfo("myTopic", 3, null, null, 
null));
+
+        
EasyMock.expect(consumer.partitionsFor("myTopic")).andReturn(partitionInfoList);
+
+        final Capture<Collection<TopicPartition>> assignmentCapture = new 
Capture<>();
+        consumer.assign(EasyMock.capture(assignmentCapture));
+        consumer.seekToBeginning(EasyMock.anyObject());
+
+        final ConsumerRecords<String, String> consumerRecords1 = 
createConsumerRecords(1, 0, 10);
+        final ConsumerRecords<String, String> consumerRecords2 = 
createConsumerRecords(2, 20, 10);
+
+        EasyMock.expect(consumer.poll(1000)).andReturn(consumerRecords1);
+        EasyMock.expect(consumer.poll(1000)).andReturn(consumerRecords2);
+        EasyMock.expect(consumer.poll(1000)).andReturn(null);
+
+        consumer.unsubscribe();
+        consumer.close();
+
+        replayAll();
+
+        final Supplier<Collection<String>> topicSupplier = () -> 
Arrays.asList("myTopic");
+        final DataContext dc = new KafkaDataContext<>(String.class, 
String.class, consumerFactory, topicSupplier);
+
+        final DataSet dataSet = dc.query().from("myTopic").select("partition", 
"offset", "value").where("key").eq(
+                "key2").execute();
+        Assert.assertTrue(dataSet.next());
+        Assert.assertEquals("Row[values=[1, 2, value2]]", 
dataSet.getRow().toString());
+        Assert.assertTrue(dataSet.next());
+        Assert.assertEquals("Row[values=[2, 22, value2]]", 
dataSet.getRow().toString());
+        Assert.assertFalse(dataSet.next());
+        dataSet.close();
+
+        verifyAll();
+
+        // assert that we assigned exactly the three partitions we queried for
+        final ArrayList<TopicPartition> capturedAssignmentList = new 
ArrayList<>(assignmentCapture.getValue());
+        Assert.assertEquals(4, capturedAssignmentList.size());
+        Assert.assertEquals(0, capturedAssignmentList.get(0).partition());
+        Assert.assertEquals(1, capturedAssignmentList.get(1).partition());
+        Assert.assertEquals(2, capturedAssignmentList.get(2).partition());
+        Assert.assertEquals(3, capturedAssignmentList.get(3).partition());
+    }
+
+    @Test
+    public void testQueryOptimizationByPartition() {
+        final ConsumerFactory consumerFactory = 
createMock(ConsumerFactory.class);
+        @SuppressWarnings("unchecked")
+        final Consumer<String, String> consumer = createMock(Consumer.class);
+
+        EasyMock.expect(consumerFactory.createConsumer("myTopic", 
String.class, String.class)).andReturn(consumer);
+
+        final Capture<Collection<TopicPartition>> assignmentCapture = new 
Capture<>();
+        consumer.assign(EasyMock.capture(assignmentCapture));
+        consumer.seekToBeginning(EasyMock.anyObject());
+
+        final ConsumerRecords<String, String> consumerRecords1 = 
createConsumerRecords(1, 0, 2);
+        final ConsumerRecords<String, String> consumerRecords2 = 
createConsumerRecords(2, 20, 1);
+
+        EasyMock.expect(consumer.poll(1000)).andReturn(consumerRecords1);
+        EasyMock.expect(consumer.poll(1000)).andReturn(consumerRecords2);
+        EasyMock.expect(consumer.poll(1000)).andReturn(null);
+
+        consumer.unsubscribe();
+        consumer.close();
+
+        replayAll();
+
+        final Supplier<Collection<String>> topicSupplier = () -> 
Arrays.asList("myTopic");
+        final DataContext dc = new KafkaDataContext<>(String.class, 
String.class, consumerFactory, topicSupplier);
+
+        final DataSet dataSet = dc.query().from("myTopic").select("offset", 
"value").where("partition").in(1, 2, 42)
+                .execute();
+        Assert.assertTrue(dataSet.next());
+        Assert.assertEquals("Row[values=[0, value0]]", 
dataSet.getRow().toString());
+        Assert.assertTrue(dataSet.next());
+        Assert.assertEquals("Row[values=[1, value1]]", 
dataSet.getRow().toString());
+        Assert.assertTrue(dataSet.next());
+        Assert.assertEquals("Row[values=[20, value0]]", 
dataSet.getRow().toString());
+        Assert.assertFalse(dataSet.next());
+        dataSet.close();
+
+        verifyAll();
+
+        // assert that we assigned exactly the three partitions we queried for
+        final ArrayList<TopicPartition> capturedAssignmentList = new 
ArrayList<>(assignmentCapture.getValue());
+        Assert.assertEquals(3, capturedAssignmentList.size());
+        Assert.assertEquals(1, capturedAssignmentList.get(0).partition());
+        Assert.assertEquals(2, capturedAssignmentList.get(1).partition());
+        Assert.assertEquals(42, capturedAssignmentList.get(2).partition());
+    }
+
+    private ConsumerRecords<String, String> createConsumerRecords(int 
partition, int offset, int howMany) {
+        final List<ConsumerRecord<String, String>> list = new ArrayList<>();
+        for (int i = 0; i < howMany; i++) {
+            list.add(new ConsumerRecord<String, String>("myTopic", partition, 
offset + i, "key" + i, "value" + i));
+        }
+
+        final Map<TopicPartition, List<ConsumerRecord<String, String>>> 
records = new HashMap<>();
+        records.put(new TopicPartition("myTopic", partition), list);
+        return new ConsumerRecords<>(records);
+    }
+
+}

Reply via email to