Github user fhueske commented on a diff in the pull request:
https://github.com/apache/incubator-flink/pull/220#discussion_r20675793
--- Diff:
flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
---
@@ -23,182 +23,69 @@
import java.util.ArrayList;
import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.addons.hbase.common.HBaseKey;
-import org.apache.flink.addons.hbase.common.HBaseResult;
-import org.apache.flink.addons.hbase.common.HBaseUtil;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.OperatingSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* {@link InputFormat} subclass that wraps the access for HTables.
+ *
+ * @author Flavio Pompermaier <[email protected]>
*/
-public class TableInputFormat implements InputFormat<Record,
TableInputSplit> {
+public abstract class TableInputFormat<T extends Tuple> implements
InputFormat<T, TableInputSplit>{
private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(TableInputFormat.class);
- /** A handle on an HBase table */
- private HTable table;
-
- /** The scanner that performs the actual access on the table. HBase
object */
- private Scan scan;
-
- /** Hbase' iterator wrapper */
- private TableRecordReader tableRecordReader;
-
/** helper variable to decide whether the input is exhausted or not */
private boolean endReached = false;
+
+ // TODO table and scan could be serialized when kryo serializer will be
the default
+ private transient HTable table;
+ private transient Scan scan;
+
+ /** HBase iterator wrapper */
+ private ResultScanner rs;
- /** Job parameter that specifies the input table. */
- public static final String INPUT_TABLE = "hbase.inputtable";
-
- /** Location of the hbase-site.xml. If set, the HBaseAdmin will build
inside */
- public static final String CONFIG_LOCATION = "hbase.config.location";
-
- /**
- * Base-64 encoded scanner. All other SCAN_ confs are ignored if this
is specified.
- * See TableMapReduceUtil.convertScanToString(Scan) for more details.
- */
- public static final String SCAN = "hbase.scan";
-
- /** Column Family to Scan */
- public static final String SCAN_COLUMN_FAMILY =
"hbase.scan.column.family";
-
- /** Space delimited list of columns to scan. */
- public static final String SCAN_COLUMNS = "hbase.scan.columns";
-
- /** The timestamp used to filter columns with a specific timestamp. */
- public static final String SCAN_TIMESTAMP = "hbase.scan.timestamp";
-
- /** The starting timestamp used to filter columns with a specific range
of versions. */
- public static final String SCAN_TIMERANGE_START =
"hbase.scan.timerange.start";
-
- /** The ending timestamp used to filter columns with a specific range
of versions. */
- public static final String SCAN_TIMERANGE_END =
"hbase.scan.timerange.end";
-
- /** The maximum number of version to return. */
- public static final String SCAN_MAXVERSIONS = "hbase.scan.maxversions";
-
- /** Set to false to disable server-side caching of blocks for this
scan. */
- public static final String SCAN_CACHEBLOCKS = "hbase.scan.cacheblocks";
-
- /** The number of rows for caching that will be passed to scanners. */
- public static final String SCAN_CACHEDROWS = "hbase.scan.cachedrows";
-
- /** mutable objects that are used to avoid recreation of wrapper
objects */
- protected HBaseKey hbaseKey;
-
- protected HBaseResult hbaseResult;
-
- private org.apache.hadoop.conf.Configuration hConf;
-
- @Override
- public void configure(Configuration parameters) {
- HTable table = createTable(parameters);
- setTable(table);
- Scan scan = createScanner(parameters);
- setScan(scan);
- }
-
+ // abstract methods allow for multiple table and scanners in the same
job
+ protected abstract Scan getScanner();
+ protected abstract String getTableName();
+ protected abstract T mapResultToTuple(Result r);
--- End diff --
Are the HBase keys also contained in the Result such that it is possible to
put them into the outgoing Tuple?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---