Github user bhupeshchawda commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/292#discussion_r68172645
  
    --- Diff: 
contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java 
---
    @@ -69,116 +70,105 @@ public Object convertValue( HBaseFieldInfo fieldInfo, 
Object value)
       public void setup(OperatorContext context)
       {
         try {
    -      store.connect();
    +      super.setup(context);
           pojoType = Class.forName(pojoTypeName);
           pojoType.newInstance();   //try create new instance to verify the 
class.
           rowSetter = PojoUtils.createSetter(pojoType, 
tableInfo.getRowOrIdExpression(), String.class);
    -      fieldValueGenerator = 
FieldValueGenerator.getFieldValueGenerator(pojoType, tableInfo.getFieldsInfo() 
);
    +      fieldValueGenerator = 
HBaseFieldValueGenerator.getHBaseFieldValueGenerator(pojoType, 
tableInfo.getFieldsInfo() );
           valueConverter = new BytesValueConverter();
    +      scan = new Scan();
         } catch (Exception ex) {
           throw new RuntimeException(ex);
         }
       }
     
       @Override
    -  public void beginWindow(long windowId)
    -  {
    -  }
    -
    -  @Override
    -  public void teardown()
    -  {
    -    try {
    -      store.disconnect();
    -    } catch (IOException ex) {
    -      throw new RuntimeException(ex);
    -    }
    -  }
    -
    -  @Override
    -  public void emitTuples()
    +  protected Object getTuple(Result result)
       {
         try {
    -      Scan scan = nextScan();
    -      if (scan == null)
    -        return;
    -
    -      ResultScanner resultScanner = store.getTable().getScanner(scan);
    -
    -      while (true) {
    -        Result result = resultScanner.next();
    -        if (result == null)
    -          break;
    -
    -        String readRow = Bytes.toString(result.getRow());
    -        if( readRow.equals( lastReadRow ))
    -          continue;
    -
    -        Object instance = pojoType.newInstance();
    -        rowSetter.set(instance, readRow);
    -
    -        List<Cell> cells = result.listCells();
    +      String readRow = Bytes.toString(result.getRow());
    +      if( readRow.equals( getLastReadRow() )) {
    --- End diff --
    
    This will need changes to the signature of getTuple. Here we are assuming 
that getTuple will extract the Hbase row and then convert it to the output 
type. However, the lastReadRow is stored in the raw form. In order to move this 
to base class, we'll have to map the tuple to a raw type and then check if the 
record is the one we saw last.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to