[
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829295#comment-15829295
]
ASF GitHub Bot commented on FLINK-2168:
---------------------------------------
Github user ramkrish86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/3149#discussion_r96790136
--- Diff:
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends RichInputFormat<Row,
TableInputSplit> implements ResultTypeQueryable<Row> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+ private String tableName;
+ private TypeInformation[] fieldTypeInfos;
+ private String[] fieldNames;
+ private transient Table table;
+ private transient Scan scan;
+ private transient Connection conn;
+ private ResultScanner resultScanner = null;
+
+ private byte[] lastRow;
+ private int scannedRows;
+ private boolean endReached = false;
+ private org.apache.hadoop.conf.Configuration conf;
+ private static final String COLON = ":";
+
+ public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration
conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) {
+ this.conf = conf;
+ this.tableName = tableName;
+ this.fieldNames = fieldNames;
+ this.fieldTypeInfos = fieldTypeInfos;
+ }
+
+ @Override
+ public void configure(Configuration parameters) {
+ LOG.info("Initializing HBaseConfiguration");
+ connectToTable();
+ if(table != null) {
+ scan = createScanner();
+ }
+ }
+
+ private Scan createScanner() {
+ Scan scan = new Scan();
+ for(String field : fieldNames) {
+ // select only the fields in the 'selectedFields'
+ String[] famCol = field.split(COLON);
+ scan.addColumn(Bytes.toBytes(famCol[0]),
Bytes.toBytes(famCol[1]));
+ }
+ return scan;
+ }
+
+ private void connectToTable() {
+ //use files found in the classpath
+ if(this.conf == null) {
+ this.conf = HBaseConfiguration.create();
+ }
+ try {
+ conn = ConnectionFactory.createConnection(this.conf);
+ } catch(IOException ioe) {
+ LOG.error("Exception while creating connection to hbase
cluster", ioe);
+ return;
+ }
+ try {
+ table = conn.getTable(TableName.valueOf(tableName));
+ } catch(TableNotFoundException tnfe) {
+ LOG.error("The table " + tableName + " not found ",
tnfe);
+ } catch(IOException ioe) {
+ LOG.error("Exception while connecting to the table
"+tableName+ " ", ioe);
+ }
+ }
+
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics cachedStatistics)
throws IOException {
+ return null;
+ }
+
+ @Override
+ public TableInputSplit[] createInputSplits(final int minNumSplits)
throws IOException {
+ if (table == null) {
+ throw new IOException("The HBase table has not been
opened!");
+ }
+ if (scan == null) {
+ throw new IOException("getScanner returned null");
+ }
+
+ //Gets the starting and ending row keys for every region in the
currently open table
+ HRegionLocator regionLocator = new
HRegionLocator(table.getName(), (ClusterConnection) conn);
+ final Pair<byte[][], byte[][]> keys =
regionLocator.getStartEndKeys();
+ if (keys == null || keys.getFirst() == null ||
keys.getFirst().length == 0) {
+ throw new IOException("Expecting at least one region.");
+ }
+ final byte[] startRow = scan.getStartRow();
+ final byte[] stopRow = scan.getStopRow();
+ final boolean scanWithNoLowerBound = startRow.length == 0;
+ final boolean scanWithNoUpperBound = stopRow.length == 0;
+
+ final List<TableInputSplit> splits = new
ArrayList<TableInputSplit>(minNumSplits);
+ for (int i = 0; i < keys.getFirst().length; i++) {
+ final byte[] startKey = keys.getFirst()[i];
+ final byte[] endKey = keys.getSecond()[i];
+ final String regionLocation =
regionLocator.getRegionLocation(startKey, false).getHostnamePort();
+ //Test if the given region is to be included in the
InputSplit while splitting the regions of a table
+ if (!includeRegionInSplit(startKey, endKey)) {
+ continue;
+ }
+ //Finds the region on which the given row is being
served
+ final String[] hosts = new String[]{regionLocation};
+
+ // determine if regions contains keys used by the scan
+ boolean isLastRegion = endKey.length == 0;
+ if ((scanWithNoLowerBound || isLastRegion ||
Bytes.compareTo(startRow, endKey) < 0) &&
+ (scanWithNoUpperBound ||
Bytes.compareTo(stopRow, startKey) > 0)) {
+
+ final byte[] splitStart = scanWithNoLowerBound
|| Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow;
+ final byte[] splitStop = (scanWithNoUpperBound
|| Bytes.compareTo(endKey, stopRow) <= 0)
+ && !isLastRegion ? endKey : stopRow;
+ int id = splits.size();
+ final TableInputSplit split = new
TableInputSplit(id, hosts, table.getName().getName(), splitStart, splitStop);
+ splits.add(split);
+ }
+ }
+ LOG.info("Created " + splits.size() + " splits");
+ for (TableInputSplit split : splits) {
+ logSplitInfo("created", split);
+ }
+ return splits.toArray(new TableInputSplit[0]);
+ }
+
+ protected boolean includeRegionInSplit(final byte[] startKey, final
byte[] endKey) {
+ return true;
+ }
+
+ @Override
+ public InputSplitAssigner getInputSplitAssigner(TableInputSplit[]
inputSplits) {
+ return new LocatableInputSplitAssigner(inputSplits);
+ }
+
+ @Override
+ public void open(TableInputSplit split) throws IOException {
+ if (table == null) {
+ throw new IOException("The HBase table has not been
opened!");
+ }
+ if (scan == null) {
+ throw new IOException("getScanner returned null");
+ }
+ if (split == null) {
+ throw new IOException("Input split is null!");
+ }
+
+ logSplitInfo("opening", split);
+ // set the start row and stop row from the splits
+ scan.setStartRow(split.getStartRow());
+ lastRow = split.getEndRow();
+ scan.setStopRow(lastRow);
+
+ resultScanner = table.getScanner(scan);
+ endReached = false;
+ scannedRows = 0;
+ }
+
+ private void logSplitInfo(String action, TableInputSplit split) {
+ int splitId = split.getSplitNumber();
+ String splitStart = Bytes.toString(split.getStartRow());
+ String splitEnd = Bytes.toString(split.getEndRow());
+ String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
+ String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
+ String[] hostnames = split.getHostnames();
+ LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this,
splitId, hostnames, splitStartKey, splitStopKey);
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return endReached;
+ }
+
+ @Override
+ public Row nextRecord(Row reuse) throws IOException {
+ if (resultScanner == null) {
+ throw new IOException("No table result scanner
provided!");
+ }
+ try {
+ Result res = resultScanner.next();
+ if (res != null) {
+ scannedRows++;
+ lastRow = res.getRow();
+ return mapResultToRow(res);
+ }
+ } catch (Exception e) {
+ resultScanner.close();
+ //workaround for timeout on scan
+ LOG.warn("Error after scan of " + scannedRows + " rows.
Retry with a new scanner...", e);
+ scan.setStartRow(lastRow);
+ resultScanner = table.getScanner(scan);
+ Result res = resultScanner.next();
+ if (res != null) {
+ scannedRows++;
+ lastRow = res.getRow();
+ return mapResultToRow(res);
+ }
+ }
+ endReached = true;
+ return null;
+ }
+
+ private Row mapResultToRow(Result res) {
+ Object[] values = new Object[fieldNames.length];
+ int i = 0;
+ for(String field : fieldNames) {
+ String[] famCol = field.split(COLON);
+ byte[] value = res.getValue(Bytes.toBytes(famCol[0]),
Bytes.toBytes(famCol[1]));
+ TypeInformation typeInfo = fieldTypeInfos[i];
+ if(typeInfo.isBasicType()) {
+ if(typeInfo.getTypeClass() == Integer.class) {
+ values[i] = Bytes.toInt(value);
+ } else if(typeInfo.getTypeClass() ==
Short.class) {
+ values[i] = Bytes.toShort(value);
+ } else if(typeInfo.getTypeClass() ==
Float.class) {
+ values[i] = Bytes.toFloat(value);
+ } else if(typeInfo.getTypeClass() ==
Long.class) {
+ values[i] = Bytes.toLong(value);
+ } else if(typeInfo.getTypeClass() ==
String.class) {
+ values[i] = Bytes.toString(value);
+ } else if(typeInfo.getTypeClass() ==
Byte.class) {
+ values[i] = value[0];
+ } else if(typeInfo.getTypeClass() ==
Boolean.class) {
+ values[i] = Bytes.toBoolean(value);
+ } else if(typeInfo.getTypeClass() ==
Double.class) {
+ values[i] = Bytes.toDouble(value);
+ } else if(typeInfo.getTypeClass() ==
BigInteger.class) {
+ values[i] = new BigInteger(value);
+ } else if(typeInfo.getTypeClass() ==
BigDecimal.class) {
+ values[i] = Bytes.toBigDecimal(value);
+ } else if(typeInfo.getTypeClass() ==
Date.class) {
+ values[i] = new
Date(Bytes.toLong(value));
+ }
+ } else {
+ // TODO for other types??
--- End diff --
I can see that in the TableSourceITCase that uses CsvInputFormat - the
metadata is also passed as the input and the meta data is registered with the
table. Now in cases like hbase - how should that be done?
> Add HBaseTableSource
> --------------------
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Affects Versions: 0.9
> Reporter: Fabian Hueske
> Assignee: ramkrishna.s.vasudevan
> Priority: Minor
> Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}}
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)