Repository: apex-malhar Updated Branches: refs/heads/master 67b84dda4 -> 0b66f19d1
APEXMALHAR-1957: Added threading for reading data from hbase. Added support for progressive read. Added unit tests. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0b66f19d Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0b66f19d Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0b66f19d Branch: refs/heads/master Commit: 0b66f19d14a518b83059c74573dc7fdd58693788 Parents: 67b84dd Author: bhupesh <[email protected]> Authored: Thu Dec 24 18:50:25 2015 +0530 Committer: bhupesh <[email protected]> Committed: Wed Jul 13 11:53:06 2016 +0530 ---------------------------------------------------------------------- .../contrib/hbase/HBaseFieldValueGenerator.java | 59 ++++++ .../contrib/hbase/HBaseGetOperator.java | 2 +- .../contrib/hbase/HBaseInputOperator.java | 39 +--- .../contrib/hbase/HBasePOJOInputOperator.java | 156 ++++++++-------- .../contrib/hbase/HBaseScanOperator.java | 181 ++++++++++++++++++- .../contrib/hbase/HBaseGetOperatorTest.java | 8 +- .../hbase/HBasePOJOInputOperatorTest.java | 37 +++- .../contrib/hbase/HBaseScanOperatorTest.java | 6 +- 8 files changed, 351 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldValueGenerator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldValueGenerator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldValueGenerator.java new file mode 100644 index 0000000..52b6f4b --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldValueGenerator.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.hbase; + +import java.util.List; + +import com.datatorrent.lib.util.FieldValueGenerator; +import com.datatorrent.lib.util.PojoUtils; + +/** + * A {@link FieldValueGenerator} implementation for {@link HBaseFieldInfo} + */ +public class HBaseFieldValueGenerator extends FieldValueGenerator<HBaseFieldInfo> +{ + public static final String COLON = ":"; + + @SuppressWarnings("unchecked") + protected HBaseFieldValueGenerator(final Class<?> clazz, List<HBaseFieldInfo> fieldInfos) + { + for (HBaseFieldInfo fieldInfo : fieldInfos) { + fieldInfoMap.put(fieldInfo.getFamilyName() + COLON + fieldInfo.getColumnName(), fieldInfo); + + PojoUtils.Getter<Object, Object> getter = + PojoUtils.createGetter(clazz, fieldInfo.getPojoFieldExpression(), fieldInfo.getType().getJavaType()); + fieldGetterMap.put(fieldInfo, getter); + } + + for (HBaseFieldInfo fieldInfo : fieldInfos) { + PojoUtils.Setter<Object, Object> setter = + PojoUtils.createSetter(clazz, fieldInfo.getPojoFieldExpression(), fieldInfo.getType().getJavaType()); + fieldSetterMap.put(fieldInfo, setter); + } + } + + public void setColumnValue(Object instance, String columnName, String columnFamily, Object value, + ValueConverter<HBaseFieldInfo> valueConverter) + { + HBaseFieldInfo fieldInfo = fieldInfoMap.get(columnFamily + COLON + columnName); + PojoUtils.Setter<Object, Object> setter = fieldSetterMap.get(fieldInfo); + setter.set(instance, valueConverter == null ? value : valueConverter.convertValue(fieldInfo, value)); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseGetOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseGetOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseGetOperator.java index 0f583e0..37270d5 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseGetOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseGetOperator.java @@ -47,7 +47,7 @@ public abstract class HBaseGetOperator<T> extends HBaseInputOperator<T> { try { Get get = operationGet(); - Result result = table.get(get); + Result result = getStore().getTable().get(get); KeyValue[] kvs = result.raw(); //T t = getTuple(kvs); //T t = getTuple(result); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseInputOperator.java index 121e703..6f11621 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseInputOperator.java @@ -18,10 +18,8 @@ */ package com.datatorrent.contrib.hbase; -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.InputOperator; -import java.io.IOException; +import com.datatorrent.lib.db.AbstractStoreInputOperator; /** * A base implementation of hbase input operator which derives from HBaseOperatorBase. <br> @@ -33,39 +31,6 @@ import java.io.IOException; * @param <T> The tuple type * @since 0.3.2 */ -public abstract class HBaseInputOperator<T> extends HBaseOperatorBase implements InputOperator +public abstract class HBaseInputOperator<T> extends AbstractStoreInputOperator<T, HBaseStore> implements InputOperator { - /** - * Output port that emits tuples into the DAG. - */ - public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<T>(); - - //protected abstract T getTuple(Result result); - //protected abstract T getTuple(KeyValue kv); - - @Override - public void beginWindow(long windowId) - { - } - - @Override - public void endWindow() - { - } - - @Override - public void setup(OperatorContext context) - { - try{ - setupConfiguration(); - } catch (IOException ie) { - throw new RuntimeException(ie); - } - } - - @Override - public void teardown() - { - } - } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java index 4182e84..e459ec7 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperator.java @@ -18,14 +18,14 @@ */ package com.datatorrent.contrib.hbase; -import java.io.IOException; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; @@ -34,27 +34,30 @@ import com.datatorrent.lib.util.FieldValueGenerator.ValueConverter; import com.datatorrent.lib.util.PojoUtils; import com.datatorrent.lib.util.PojoUtils.Setter; import com.datatorrent.lib.util.TableInfo; +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.Context.OperatorContext; /** + * HBasePOJOInputOperator reads data from a HBase store, converts it to a POJO and puts it on the output port. + * The read from HBase is asynchronous. * @displayName HBase Input Operator * @category Input * @tags database, nosql, pojo, hbase * @since 3.1.0 */ @Evolving -public class HBasePOJOInputOperator extends HBaseInputOperator<Object> +public class HBasePOJOInputOperator extends HBaseScanOperator<Object> { private TableInfo<HBaseFieldInfo> tableInfo; - protected HBaseStore store; private String pojoTypeName; - private String startRow; - private String lastReadRow; - protected transient Class pojoType; - private transient Setter<Object, String> rowSetter; + // Transients + protected transient Class<?> pojoType; protected transient FieldValueGenerator<HBaseFieldInfo> fieldValueGenerator; protected transient BytesValueConverter valueConverter; + private transient Scan scan; + private transient Setter<Object, String> rowSetter; public static class BytesValueConverter implements ValueConverter<HBaseFieldInfo> { @@ -65,120 +68,121 @@ public class HBasePOJOInputOperator extends HBaseInputOperator<Object> } } - @Override - public void setup(OperatorContext context) + public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>() { - try { - store.connect(); - pojoType = Class.forName(pojoTypeName); - pojoType.newInstance(); //try create new instance to verify the class. - rowSetter = PojoUtils.createSetter(pojoType, tableInfo.getRowOrIdExpression(), String.class); - fieldValueGenerator = FieldValueGenerator.getFieldValueGenerator(pojoType, tableInfo.getFieldsInfo() ); - valueConverter = new BytesValueConverter(); - } catch (Exception ex) { - throw new RuntimeException(ex); + public void setup(com.datatorrent.api.Context.PortContext context) + { + pojoType = context.getAttributes().get(Context.PortContext.TUPLE_CLASS); } - } - - @Override - public void beginWindow(long windowId) - { - } + }; @Override - public void teardown() + public void activate(Context context) { try { - store.disconnect(); - } catch (IOException ex) { + pojoType.newInstance(); // try create new instance to verify the class. + rowSetter = PojoUtils.createSetter(pojoType, tableInfo.getRowOrIdExpression(), String.class); + fieldValueGenerator = new HBaseFieldValueGenerator(pojoType, tableInfo.getFieldsInfo()); + valueConverter = new BytesValueConverter(); + scan = new Scan(); + super.activate(context); + } catch (Exception ex) { throw new RuntimeException(ex); } } @Override - public void emitTuples() + protected Object getTuple(Result result) { try { - Scan scan = nextScan(); - if (scan == null) - return; - - ResultScanner resultScanner = store.getTable().getScanner(scan); - - while (true) { - Result result = resultScanner.next(); - if (result == null) - break; - - String readRow = Bytes.toString(result.getRow()); - if( readRow.equals( lastReadRow )) - continue; - - Object instance = pojoType.newInstance(); - rowSetter.set(instance, readRow); - - List<Cell> cells = result.listCells(); + String readRow = Bytes.toString(result.getRow()); + if( readRow.equals( getLastReadRow() )) { + return null; + } - for (Cell cell : cells) { - String columnName = Bytes.toString(CellUtil.cloneQualifier(cell)); - byte[] value = CellUtil.cloneValue(cell); - fieldValueGenerator.setColumnValue( instance, columnName, value, valueConverter ); - } + Object instance = pojoType.newInstance(); + rowSetter.set(instance, readRow); - outputPort.emit(instance); - lastReadRow = readRow; + List<Cell> cells = result.listCells(); + for (Cell cell : cells) { + String columnName = Bytes.toString(CellUtil.cloneQualifier(cell)); + String columnFamily = Bytes.toString(CellUtil.cloneFamily(cell)); + byte[] value = CellUtil.cloneValue(cell); + ((HBaseFieldValueGenerator)fieldValueGenerator).setColumnValue(instance, columnName, columnFamily, value, + valueConverter); } + setLastReadRow(readRow); + return instance; } catch (Exception e) { - throw new RuntimeException(e.getMessage()); + throw new RuntimeException(e); } - } - protected Scan nextScan() + @Override + protected Scan operationScan() { - if(lastReadRow==null && startRow==null ) - return new Scan(); - else - return new Scan( Bytes.toBytes( lastReadRow == null ? startRow : lastReadRow ) ); + if (getLastReadRow() == null && getStartRow() == null) { + // If no start row specified and no row read yet + if (scan == null) { + scan = new Scan(); + } + } else if (getEndRow() == null) { + // If only start row specified + scan.setStartRow(Bytes.toBytes(getLastReadRow() == null ? getStartRow() : getLastReadRow())); + } else { + // If end row also specified + scan.setStartRow(Bytes.toBytes(getLastReadRow() == null ? getStartRow() : getLastReadRow())); + scan.setStopRow(Bytes.toBytes(getEndRow())); + } + for (HBaseFieldInfo field : tableInfo.getFieldsInfo()) { + scan.addColumn(Bytes.toBytes(field.getFamilyName()), Bytes.toBytes(field.getColumnName())); + } + return scan; } - public HBaseStore getStore() - { - return store; - } - public void setStore(HBaseStore store) + @Override + protected void emitTuple(Object tuple) { - this.store = store; + outputPort.emit(tuple); } + /** + * Returns the {@link #tableInfo} object as configured + * @return {@link #tableInfo} + */ public TableInfo<HBaseFieldInfo> getTableInfo() { return tableInfo; } + /** + * Sets the {@link #tableInfo} object + * @param tableInfo + */ public void setTableInfo(TableInfo<HBaseFieldInfo> tableInfo) { this.tableInfo = tableInfo; } + /** + * Returns the POJO class name + * @return {@link #pojoTypeName} + */ public String getPojoTypeName() { return pojoTypeName; } + /** + * Sets the POJO class name + * @param pojoTypeName + */ public void setPojoTypeName(String pojoTypeName) { this.pojoTypeName = pojoTypeName; } - public String getStartRow() - { - return startRow; - } + private static final Logger logger = LoggerFactory.getLogger(HBasePOJOInputOperator.class); - public void setStartRow(String startRow) - { - this.startRow = startRow; - } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java index 1b0d657..b694e67 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseScanOperator.java @@ -18,8 +18,19 @@ */ package com.datatorrent.contrib.hbase; +import java.io.IOException; +import java.util.Queue; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Context.OperatorContext; +import com.google.common.collect.Queues; + import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -40,27 +51,121 @@ import org.apache.hadoop.hbase.client.ResultScanner; * @tags hbase, scan, input operator * @since 0.3.2 */ -public abstract class HBaseScanOperator<T> extends HBaseInputOperator<T> +public abstract class HBaseScanOperator<T> extends HBaseInputOperator<T> implements Operator.ActivationListener<Context> { + public static final int DEF_QUEUE_SIZE = 1000; + public static final int DEF_SLEEP_MILLIS = 10; + + private String startRow; + private String endRow; + private String lastReadRow; + private int queueSize = DEF_QUEUE_SIZE; + private int sleepMillis = DEF_SLEEP_MILLIS; + private Queue<Result> resultQueue; + private volatile boolean running; + + @AutoMetric + protected long tuplesRead; + + // Transients + protected transient Scan scan; + protected transient ResultScanner scanner; + protected transient Thread readThread; + private transient String threadFailureReason = null; + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + resultQueue = Queues.newLinkedBlockingQueue(queueSize); + } + + @Override + public void activate(Context context) + { + startReadThread(); + } + + protected void startReadThread() + { + try { + scan = operationScan(); + scanner = getStore().getTable().getScanner(scan); + } catch (IOException e) { + throw new RuntimeException(e); + } + readThread = new Thread(new Runnable() + { + @Override + public void run() + { + try { + Result result; + while (true) { + if ((result = scanner.next()) != null) { + while ( running && !resultQueue.offer(result)) { + Thread.sleep(sleepMillis); + } + } + } + } catch (Exception e) { + logger.debug("Exception in fetching results {}", e.getMessage()); + threadFailureReason = e.getMessage(); + } finally { + scanner.close(); + } + } + }); + readThread.start(); + } + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + tuplesRead = 0; + running = true; + } @Override public void emitTuples() { + if (!readThread.isAlive()) { + throw new RuntimeException(threadFailureReason); + } try { - HTable table = getTable(); - Scan scan = operationScan(); - ResultScanner scanner = table.getScanner(scan); - for (Result result : scanner) { - //KeyValue[] kvs = result.raw(); - //T t = getTuple(kvs); - T t = getTuple(result); - outputPort.emit(t); + Result result = resultQueue.poll(); + if (result == null) { + return; + } + T tuple = getTuple(result); + if (tuple != null) { + emitTuple(tuple); + tuplesRead++; } } catch (Exception e) { - e.printStackTrace(); + throw new RuntimeException(e); } } + protected void emitTuple(T tuple) + { + outputPort.emit(tuple); + } + + @Override + public void endWindow() + { + running = false; + super.endWindow(); + } + + @Override + public void deactivate() + { + readThread.interrupt(); + } + /** * Return a HBase Scan metric to retrieve the tuple. * The implementor should return a HBase Scan metric that specifies where to retrieve the tuple from @@ -79,4 +184,60 @@ public abstract class HBaseScanOperator<T> extends HBaseInputOperator<T> */ protected abstract T getTuple(Result result); + /** + * Returns the start row key in the table as set previously + * @return {@link #startRow} + */ + public String getStartRow() + { + return startRow; + } + + /** + * Sets the start row key in the table from where the scan should begin + * @param startRow + */ + public void setStartRow(String startRow) + { + this.startRow = startRow; + } + + /** + * Returns the end row key in the table as set previously + * @return {@link #endRow} + */ + public String getEndRow() + { + return endRow; + } + + /** + * Sets the end row key in the table where the scan should end + * @param endRow + */ + public void setEndRow(String endRow) + { + this.endRow = endRow; + } + + /** + * Returns the last read row key from the hbase table + * @return {@link #lastReadRow} + */ + public String getLastReadRow() + { + return lastReadRow; + } + + /** + * Sets the last read row key from the hbase table. After the failures, the new scan will start from this row key + * @param lastReadRow + */ + public void setLastReadRow(String lastReadRow) + { + this.lastReadRow = lastReadRow; + } + + private static final Logger logger = LoggerFactory.getLogger(HBaseScanOperator.class); + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseGetOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseGetOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseGetOperatorTest.java index 9c60507..44f2323 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseGetOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseGetOperatorTest.java @@ -51,12 +51,14 @@ public class HBaseGetOperatorTest dag.setAttribute(DAG.APPLICATION_NAME, "HBaseGetOperatorTest"); TestHBaseGetOperator thop = dag.addOperator("testhbaseget", TestHBaseGetOperator.class); + HBaseStore store = new HBaseStore(); + thop.setStore(store); HBaseTupleCollector tc = dag.addOperator("tuplecollector", HBaseTupleCollector.class); dag.addStream("ss", thop.outputPort, tc.inputPort); - thop.setTableName("table1"); - thop.setZookeeperQuorum("127.0.0.1"); - thop.setZookeeperClientPort(2181); + thop.getStore().setTableName("table1"); + thop.getStore().setZookeeperQuorum("127.0.0.1"); + thop.getStore().setZookeeperClientPort(2181); LocalMode.Controller lc = lma.getController(); lc.setHeartbeatMonitoringEnabled(false); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java index d0b3e94..4e6bb39 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java @@ -28,10 +28,13 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.internal.runners.statements.Fail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datatorrent.api.Attribute.AttributeMap; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DAG; import com.datatorrent.api.LocalMode; import com.datatorrent.api.StreamingApplication; @@ -59,17 +62,33 @@ public class HBasePOJOInputOperatorTest this.setTupleType( TestPOJO.class ); } } - + + public static class TestHBasePOJOInputOperator extends HBasePOJOInputOperator + { + @Override + public void setup(OperatorContext context) + { + try { + // Added to let the output operator insert data into hbase table before input operator can read it + Thread.sleep(1000); + } catch(InterruptedException e) { + throw new RuntimeException(e); + } + super.setup(context); + } + } + private static final Logger logger = LoggerFactory.getLogger( HBasePOJOInputOperatorTest.class ); private final int TUPLE_NUM = 1000; + private final long RUN_DURATION = 30000; // time in ms private HBaseStore store; private HBasePOJOPutOperator hbaseOutputOperator; - private HBasePOJOInputOperator hbaseInputOperator; + private TestHBasePOJOInputOperator hbaseInputOperator; @Before public void prepare() throws Exception { - hbaseInputOperator = new HBasePOJOInputOperator(); + hbaseInputOperator = new TestHBasePOJOInputOperator(); hbaseOutputOperator = new HBasePOJOPutOperator(); setupOperators(); HBaseUtil.createTable( store.getConfiguration(), store.getTableName()); @@ -104,6 +123,7 @@ public class HBasePOJOInputOperatorTest hbaseOutputOperator = dag.addOperator( OPERATOR.HBASEOUTPUT.name(), hbaseOutputOperator ); hbaseInputOperator = dag.addOperator(OPERATOR.HBASEINPUT.name(), hbaseInputOperator); + dag.setOutputPortAttribute(hbaseInputOperator.outputPort, Context.PortContext.TUPLE_CLASS, TestPOJO.class); TupleCacheOutputOperator output = dag.addOperator(OPERATOR.OUTPUT.name(), TupleCacheOutputOperator.class); @@ -120,6 +140,7 @@ public class HBasePOJOInputOperatorTest final LocalMode.Controller lc = lma.getController(); lc.runAsync(); + long start = System.currentTimeMillis(); //generator.doneLatch.await(); while(true) { @@ -128,10 +149,14 @@ public class HBasePOJOInputOperatorTest Thread.sleep(1000); } catch( Exception e ){} - + logger.info("Tuple row key: ", output.getReceivedTuples()); logger.info( "Received tuple number {}, instance is {}.", output.getReceivedTuples() == null ? 0 : output.getReceivedTuples().size(), System.identityHashCode( output ) ); - if( output.getReceivedTuples() != null && output.getReceivedTuples().size() == TUPLE_NUM ) + if( output.getReceivedTuples() != null && output.getReceivedTuples().size() == TUPLE_NUM ) { break; + } + if(System.currentTimeMillis() - start > RUN_DURATION) { + throw new RuntimeException("Testcase taking too long"); + } } lc.shutdown(); @@ -173,8 +198,6 @@ public class HBasePOJOInputOperatorTest hbaseInputOperator.setStore(store); hbaseOutputOperator.setStore(store); - hbaseInputOperator.setPojoTypeName( TestPOJO.class.getName() ); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( OPERATOR_ID, new AttributeMap.DefaultAttributeMap()); hbaseInputOperator.setup(context); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0b66f19d/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseScanOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseScanOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseScanOperatorTest.java index e190841..254ef37 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseScanOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseScanOperatorTest.java @@ -57,9 +57,9 @@ public class HBaseScanOperatorTest HBaseTupleCollector tc = dag.addOperator("tuplecollector", HBaseTupleCollector.class); dag.addStream("ss", thop.outputPort, tc.inputPort); - thop.setTableName("table1"); - thop.setZookeeperQuorum("127.0.0.1"); - thop.setZookeeperClientPort(2181); + thop.getStore().setTableName("table1"); + thop.getStore().setZookeeperQuorum("127.0.0.1"); + thop.getStore().setZookeeperClientPort(2181); LocalMode.Controller lc = lma.getController(); lc.setHeartbeatMonitoringEnabled(false);
