carp84 commented on a change in pull request #9045: [FLINK-12955][hbase] 
Support LookupableTableSource for HBase
URL: https://github.com/apache/flink/pull/9045#discussion_r301606976
 
 

 ##########
 File path: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseReadHelper.java
 ##########
 @@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase.util;
+
+import org.apache.flink.addons.hbase.HBaseTableSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+
+import java.nio.charset.Charset;
+
+/**
+ * A read helper for HBase. The helper can used to create a {@link Scan} and 
{@link Get}
+ * for scanning or lookuping a HBase table, and supports converting the HBase 
{@link Result}
+ * to Flink {@link Row}.
+ */
+public class HBaseReadHelper {
+
+       // family keys
+       private final byte[][] families;
+       // qualifier keys
+       private final byte[][][] qualifiers;
+       // qualifier types
+       private final int[][] qualifierTypes;
+
+       // row key index in output row
+       private final int rowKeyIndex;
+       // type of row key
+       private final int rowKeyType;
+
+       private final int fieldLength;
+       private final Charset charset;
+
+       // row which is returned
+       private Row resultRow;
+       // nested family rows
+       private Row[] familyRows;
+
+       public HBaseReadHelper(HBaseTableSchema hbaseTableSchema) {
+               this.families = hbaseTableSchema.getFamilyKeys();
+               this.qualifiers = new byte[this.families.length][][];
+               this.qualifierTypes = new int[this.families.length][];
+               this.familyRows = new Row[this.families.length];
+               String[] familyNames = hbaseTableSchema.getFamilyNames();
+               for (int f = 0; f < families.length; f++) {
+                       this.qualifiers[f] = 
hbaseTableSchema.getQualifierKeys(familyNames[f]);
+                       TypeInformation[] typeInfos = 
hbaseTableSchema.getQualifierTypes(familyNames[f]);
+                       this.qualifierTypes[f] = new int[typeInfos.length];
+                       for (int i = 0; i < typeInfos.length; i++) {
+                               qualifierTypes[f][i] = 
HBaseTypeUtils.getTypeIndex(typeInfos[i]);
+                       }
+                       this.familyRows[f] = new Row(typeInfos.length);
+               }
+               this.charset = 
Charset.forName(hbaseTableSchema.getStringCharset());
+               // row key
+               this.rowKeyIndex = hbaseTableSchema.getRowKeyIndex();
+               this.rowKeyType = hbaseTableSchema.getRowKeyTypeInfo()
+                       .map(HBaseTypeUtils::getTypeIndex)
+                       .orElse(-1);
+
+               // field length need take row key into account if it exists.
+               this.fieldLength = rowKeyIndex == -1 ? families.length : 
families.length + 1;
+
+               // prepare output rows
+               this.resultRow = new Row(fieldLength);
+       }
+
+       /**
+        * Returns an instance of Get that retrieves the matches records from 
the HBase table.
+        *
+        * @return The appropriate instance of Get for this use case.
+        */
+       public Get createGet(Object rowKey) {
+               byte[] rowkey = HBaseTypeUtils.serializeFromObject(
+                       rowKey,
+                       rowKeyType,
+                       charset);
+               Get get = new Get(rowkey);
+               for (int f = 0; f < families.length; f++) {
+                       byte[] family = families[f];
+                       for (byte[] qualifier : qualifiers[f]) {
+                               get.addColumn(family, qualifier);
+                       }
+               }
+               return get;
+       }
+
+       /**
+        * Returns an instance of Scan that retrieves the required subset of 
records from the HBase table.
+        *
+        * @return The appropriate instance of Scan for this use case.
+        */
+       public Scan createScanner() {
 
 Review comment:
   Consider change the method name to `createScan` to prevent misunderstanding. 
scan and scanner are two different things in hbase (refer to 
`HTable.getScanner` for details).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to