[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15228275#comment-15228275
 ] 

ASF GitHub Bot commented on APEXMALHAR-1966:
--------------------------------------------

Github user bhupeshchawda commented on a diff in the pull request:

    
https://github.com/apache/incubator-apex-malhar/pull/227#discussion_r58708006
  
    --- Diff: 
contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
 ---
    @@ -269,15 +269,38 @@ public void testCassandraOutputOperator()
         fieldInfos.add(new FieldInfo("set1", "set1", null));
         fieldInfos.add(new FieldInfo("test", "test", null));
     
    -    outputOperator.setStore(transactionalStore);
         outputOperator.setFieldInfos(fieldInfos);
    +    outputOperator.input.setup(tpc);
         outputOperator.setup(context);
    +    outputOperator.activate(context);
     
    -    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
    -    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPojo.class);
    -    TestPortContext tpc = new TestPortContext(portAttributes);
    +    List<TestPojo> events = Lists.newArrayList();
    +    for (int i = 0; i < 3; i++) {
    +      Set<Integer> set = new HashSet<Integer>();
    +      set.add(i);
    +      List<Integer> list = new ArrayList<Integer>();
    +      list.add(i);
    +      Map<String, Integer> map = new HashMap<String, Integer>();
    +      map.put("key" + i, i);
    +      events.add(new TestPojo(UUID.randomUUID(), i, "abclast", true, i, 
2.0, set, list, map, new Date(System.currentTimeMillis())));
    +    }
    +
    +    outputOperator.beginWindow(0);
    +    for (TestPojo event : events) {
    +      outputOperator.input.process(event);
    +    }
    +    outputOperator.endWindow();
    +
    +    Assert.assertEquals("rows in db", 3, 
outputOperator.getNumOfEventsInStore());
    +    outputOperator.getEventsInStore();
    +  }
     
    +  @Test
    +  public void testPopulateFieldInfo()
    +  {
    +    TestOutputOperator outputOperator = setupForOutputOperatorTest();
         outputOperator.input.setup(tpc);
    +    outputOperator.setup(context);
    --- End diff --
    
    Check order of setup for input port and operator


> Cassandra output operator improvements
> --------------------------------------
>
>                 Key: APEXMALHAR-1966
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-1966
>             Project: Apache Apex Malhar
>          Issue Type: Improvement
>            Reporter: Priyanka Gugale
>            Assignee: Priyanka Gugale
>
> Update existing Cassandra output operator to:
> 1. Accept use defined parameterized queries, the queries could be for update, 
> insert or delete.
> 2. Add error port to emit tuples which couldn't be written to database.
> 3. Add metrics
> 4. Provide a way to restrict batch size



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to