[
https://issues.apache.org/jira/browse/APEXMALHAR-1957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15315066#comment-15315066
]
ASF GitHub Bot commented on APEXMALHAR-1957:
--------------------------------------------
Github user bhupeshchawda commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/292#discussion_r65787947
--- Diff:
contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java
---
@@ -69,116 +70,105 @@ public Object convertValue( HBaseFieldInfo fieldInfo,
Object value)
public void setup(OperatorContext context)
{
try {
- store.connect();
+ super.setup(context);
pojoType = Class.forName(pojoTypeName);
pojoType.newInstance(); //try create new instance to verify the
class.
rowSetter = PojoUtils.createSetter(pojoType,
tableInfo.getRowOrIdExpression(), String.class);
- fieldValueGenerator =
FieldValueGenerator.getFieldValueGenerator(pojoType, tableInfo.getFieldsInfo()
);
+ fieldValueGenerator =
HBaseFieldValueGenerator.getHBaseFieldValueGenerator(pojoType,
tableInfo.getFieldsInfo() );
valueConverter = new BytesValueConverter();
+ scan = new Scan();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@Override
- public void beginWindow(long windowId)
- {
- }
-
- @Override
- public void teardown()
- {
- try {
- store.disconnect();
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- }
-
- @Override
- public void emitTuples()
+ protected Object getTuple(Result result)
{
try {
- Scan scan = nextScan();
- if (scan == null)
- return;
-
- ResultScanner resultScanner = store.getTable().getScanner(scan);
-
- while (true) {
- Result result = resultScanner.next();
- if (result == null)
- break;
-
- String readRow = Bytes.toString(result.getRow());
- if( readRow.equals( lastReadRow ))
- continue;
-
- Object instance = pojoType.newInstance();
- rowSetter.set(instance, readRow);
-
- List<Cell> cells = result.listCells();
+ String readRow = Bytes.toString(result.getRow());
+ if( readRow.equals( getLastReadRow() )) {
--- End diff --
Do this in base class
> Improve HBasePOJOInputOperator with support for threaded read
> -------------------------------------------------------------
>
> Key: APEXMALHAR-1957
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1957
> Project: Apache Apex Malhar
> Issue Type: Task
> Reporter: Bhupesh Chawda
> Assignee: Bhupesh Chawda
>
> Add the following support to Hbase POJO Input Operator:
> * Add support for threaded read
> * Allow to specify a set of "column family: column" and fetch data only for
> these columns
> * Allow to specify an end row key to stop scanning
> * Add metrics
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)