[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15828327#comment-15828327
 ] 

ASF GitHub Bot commented on FLINK-2168:
---------------------------------------

Github user tonycox commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3149#discussion_r96668950
  
    --- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
    @@ -0,0 +1,322 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.addons.hbase;
    +
    +import org.apache.flink.api.common.io.InputFormat;
    +import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
    +import org.apache.flink.api.common.io.RichInputFormat;
    +import org.apache.flink.api.common.io.statistics.BaseStatistics;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    +import org.apache.flink.api.java.typeutils.RowTypeInfo;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.InputSplitAssigner;
    +import org.apache.flink.types.Row;
    +import org.apache.hadoop.hbase.HBaseConfiguration;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.TableNotFoundException;
    +import org.apache.hadoop.hbase.client.Scan;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.hadoop.hbase.client.ClusterConnection;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.ResultScanner;
    +import org.apache.hadoop.hbase.client.ConnectionFactory;
    +import org.apache.hadoop.hbase.client.Connection;
    +import org.apache.hadoop.hbase.client.HRegionLocator;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.hadoop.hbase.util.Pair;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.math.BigDecimal;
    +import java.math.BigInteger;
    +import java.util.ArrayList;
    +import java.util.Date;
    +import java.util.List;
    +
    +/**
    + * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
    + */
    +public class HBaseTableSourceInputFormat extends RichInputFormat<Row, 
TableInputSplit> implements ResultTypeQueryable<Row> {
    +
    +   private static final long serialVersionUID = 1L;
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
    +   private String tableName;
    +   private TypeInformation[] fieldTypeInfos;
    +   private String[] fieldNames;
    +   private transient Table table;
    +   private transient Scan scan;
    +   private transient Connection conn;
    +   private ResultScanner resultScanner = null;
    +
    +   private byte[] lastRow;
    +   private int scannedRows;
    +   private boolean endReached = false;
    +   private org.apache.hadoop.conf.Configuration conf;
    +   private static final String COLON = ":";
    +
    +   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) {
    +           this.conf = conf;
    +           this.tableName = tableName;
    +           this.fieldNames = fieldNames;
    +           this.fieldTypeInfos = fieldTypeInfos;
    +   }
    +
    +   @Override
    +   public void configure(Configuration parameters) {
    +           LOG.info("Initializing HBaseConfiguration");
    +           connectToTable();
    +           if(table != null) {
    +                   scan = createScanner();
    +           }
    +   }
    +
    +   private Scan createScanner() {
    +           Scan scan = new Scan();
    +           for(String field : fieldNames) {
    +                   // select only the fields in the 'selectedFields'
    +                   String[] famCol = field.split(COLON);
    +                   scan.addColumn(Bytes.toBytes(famCol[0]), 
Bytes.toBytes(famCol[1]));
    +           }
    +           return scan;
    +   }
    +
    +   private void connectToTable() {
    +           //use files found in the classpath
    +           if(this.conf == null) {
    +                   this.conf = HBaseConfiguration.create();
    +           }
    +           try {
    +                   conn = ConnectionFactory.createConnection(this.conf);
    +           } catch(IOException ioe) {
    +                   LOG.error("Exception while creating connection to hbase 
cluster", ioe);
    +                   return;
    +           }
    +           try {
    +                   table = conn.getTable(TableName.valueOf(tableName));
    +           } catch(TableNotFoundException tnfe) {
    +                   LOG.error("The table " + tableName + " not found ", 
tnfe);
    +           } catch(IOException ioe) {
    +                   LOG.error("Exception while connecting to the table 
"+tableName+ " ", ioe);
    +           }
    +   }
    +
    +   @Override
    +   public BaseStatistics getStatistics(BaseStatistics cachedStatistics) 
throws IOException {
    +           return null;
    +   }
    +
    +   @Override
    +   public TableInputSplit[] createInputSplits(final int minNumSplits) 
throws IOException {
    +           if (table == null) {
    +                   throw new IOException("The HBase table has not been 
opened!");
    +           }
    +           if (scan == null) {
    +                   throw new IOException("getScanner returned null");
    +           }
    +
    +           //Gets the starting and ending row keys for every region in the 
currently open table
    +           HRegionLocator regionLocator = new 
HRegionLocator(table.getName(), (ClusterConnection) conn);
    +           final Pair<byte[][], byte[][]> keys = 
regionLocator.getStartEndKeys();
    +           if (keys == null || keys.getFirst() == null || 
keys.getFirst().length == 0) {
    +                   throw new IOException("Expecting at least one region.");
    +           }
    +           final byte[] startRow = scan.getStartRow();
    +           final byte[] stopRow = scan.getStopRow();
    +           final boolean scanWithNoLowerBound = startRow.length == 0;
    +           final boolean scanWithNoUpperBound = stopRow.length == 0;
    +
    +           final List<TableInputSplit> splits = new 
ArrayList<TableInputSplit>(minNumSplits);
    +           for (int i = 0; i < keys.getFirst().length; i++) {
    +                   final byte[] startKey = keys.getFirst()[i];
    +                   final byte[] endKey = keys.getSecond()[i];
    +                   final String regionLocation = 
regionLocator.getRegionLocation(startKey, false).getHostnamePort();
    +                   //Test if the given region is to be included in the 
InputSplit while splitting the regions of a table
    +                   if (!includeRegionInSplit(startKey, endKey)) {
    +                           continue;
    +                   }
    +                   //Finds the region on which the given row is being 
served
    +                   final String[] hosts = new String[]{regionLocation};
    +
    +                   // determine if regions contains keys used by the scan
    +                   boolean isLastRegion = endKey.length == 0;
    +                   if ((scanWithNoLowerBound || isLastRegion || 
Bytes.compareTo(startRow, endKey) < 0) &&
    +                           (scanWithNoUpperBound || 
Bytes.compareTo(stopRow, startKey) > 0)) {
    +
    +                           final byte[] splitStart = scanWithNoLowerBound 
|| Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow;
    +                           final byte[] splitStop = (scanWithNoUpperBound 
|| Bytes.compareTo(endKey, stopRow) <= 0)
    +                                   && !isLastRegion ? endKey : stopRow;
    +                           int id = splits.size();
    +                           final TableInputSplit split = new 
TableInputSplit(id, hosts, table.getName().getName(), splitStart, splitStop);
    +                           splits.add(split);
    +                   }
    +           }
    +           LOG.info("Created " + splits.size() + " splits");
    +           for (TableInputSplit split : splits) {
    +                   logSplitInfo("created", split);
    +           }
    +           return splits.toArray(new TableInputSplit[0]);
    +   }
    +
    +   protected boolean includeRegionInSplit(final byte[] startKey, final 
byte[] endKey) {
    +           return true;
    +   }
    +
    +   @Override
    +   public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] 
inputSplits) {
    +           return new LocatableInputSplitAssigner(inputSplits);
    +   }
    +
    +   @Override
    +   public void open(TableInputSplit split) throws IOException {
    +           if (table == null) {
    +                   throw new IOException("The HBase table has not been 
opened!");
    +           }
    +           if (scan == null) {
    +                   throw new IOException("getScanner returned null");
    +           }
    +           if (split == null) {
    +                   throw new IOException("Input split is null!");
    +           }
    +
    +           logSplitInfo("opening", split);
    +           // set the start row and stop row from the splits
    +           scan.setStartRow(split.getStartRow());
    +           lastRow = split.getEndRow();
    +           scan.setStopRow(lastRow);
    +
    +           resultScanner = table.getScanner(scan);
    +           endReached = false;
    +           scannedRows = 0;
    +   }
    +
    +   private void logSplitInfo(String action, TableInputSplit split) {
    +           int splitId = split.getSplitNumber();
    +           String splitStart = Bytes.toString(split.getStartRow());
    +           String splitEnd = Bytes.toString(split.getEndRow());
    +           String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
    +           String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
    +           String[] hostnames = split.getHostnames();
    +           LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, 
splitId, hostnames, splitStartKey, splitStopKey);
    +   }
    +
    +   @Override
    +   public boolean reachedEnd() throws IOException {
    +           return endReached;
    +   }
    +
    +   @Override
    +   public Row nextRecord(Row reuse) throws IOException {
    +           if (resultScanner == null) {
    +                   throw new IOException("No table result scanner 
provided!");
    +           }
    +           try {
    +                   Result res = resultScanner.next();
    +                   if (res != null) {
    +                           scannedRows++;
    +                           lastRow = res.getRow();
    +                           return mapResultToRow(res);
    +                   }
    +           } catch (Exception e) {
    +                   resultScanner.close();
    +                   //workaround for timeout on scan
    +                   LOG.warn("Error after scan of " + scannedRows + " rows. 
Retry with a new scanner...", e);
    +                   scan.setStartRow(lastRow);
    +                   resultScanner = table.getScanner(scan);
    +                   Result res = resultScanner.next();
    +                   if (res != null) {
    +                           scannedRows++;
    +                           lastRow = res.getRow();
    +                           return mapResultToRow(res);
    +                   }
    +           }
    +           endReached = true;
    +           return null;
    +   }
    +
    +   private Row mapResultToRow(Result res) {
    +           Object[] values = new Object[fieldNames.length];
    +           int i = 0;
    +           for(String field : fieldNames) {
    +                   String[] famCol = field.split(COLON);
    +                   byte[] value = res.getValue(Bytes.toBytes(famCol[0]), 
Bytes.toBytes(famCol[1]));
    +                   TypeInformation typeInfo = fieldTypeInfos[i];
    +                   if(typeInfo.isBasicType()) {
    +                           if(typeInfo.getTypeClass() ==  Integer.class) {
    +                                   values[i] = Bytes.toInt(value);
    +                           } else if(typeInfo.getTypeClass() == 
Short.class) {
    +                                   values[i] = Bytes.toShort(value);
    +                           } else if(typeInfo.getTypeClass() == 
Float.class) {
    +                                   values[i] = Bytes.toFloat(value);
    +                           } else if(typeInfo.getTypeClass() == 
Long.class) {
    +                                   values[i] = Bytes.toLong(value);
    +                           } else if(typeInfo.getTypeClass() == 
String.class) {
    +                                   values[i] = Bytes.toString(value);
    +                           } else if(typeInfo.getTypeClass() == 
Byte.class) {
    +                                   values[i] = value[0];
    +                           } else if(typeInfo.getTypeClass() == 
Boolean.class) {
    +                                   values[i] = Bytes.toBoolean(value);
    +                           } else if(typeInfo.getTypeClass() == 
Double.class) {
    +                                   values[i] = Bytes.toDouble(value);
    +                           } else if(typeInfo.getTypeClass() == 
BigInteger.class) {
    +                                   values[i] = new BigInteger(value);
    +                           } else if(typeInfo.getTypeClass() == 
BigDecimal.class) {
    +                                   values[i] = Bytes.toBigDecimal(value);
    +                           } else if(typeInfo.getTypeClass() == 
Date.class) {
    +                                   values[i] = new 
Date(Bytes.toLong(value));
    +                           }
    +                   } else {
    +                           // TODO for other types??
    --- End diff --
    
    Arrays, Java SQL Date/Time/Timestamp, CompositeType


> Add HBaseTableSource
> --------------------
>
>                 Key: FLINK-2168
>                 URL: https://issues.apache.org/jira/browse/FLINK-2168
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>    Affects Versions: 0.9
>            Reporter: Fabian Hueske
>            Assignee: ramkrishna.s.vasudevan
>            Priority: Minor
>              Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to