[ 
https://issues.apache.org/jira/browse/FLINK-17097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-17097:
-------------------------------

    Assignee: xingoo

> Flink HBase Connector String field size at least equal 8
> --------------------------------------------------------
>
>                 Key: FLINK-17097
>                 URL: https://issues.apache.org/jira/browse/FLINK-17097
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / HBase
>    Affects Versions: 1.10.0
>            Reporter: xingoo
>            Assignee: xingoo
>            Priority: Major
>
> when using string field in hbase connector, the rowkey length at least 8, 
> becuase byte[] size at least 8.
> example:
> {code:java}
> //代码占位符
> rowkey: "1"
> {code}
> when using it as lookup function:
> {code:java}
> //代码占位符
> Caused by: java.lang.IllegalArgumentException: offset (0) + length (8) exceed 
> the capacity of the array: 1
>       at 
> org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:779)
>       at org.apache.hadoop.hbase.util.Bytes.toLong(Bytes.java:753)
>       at org.apache.hadoop.hbase.util.Bytes.toLong(Bytes.java:726)
>       at 
> org.apache.flink.addons.hbase.util.HBaseTypeUtils.deserializeToObject(HBaseTypeUtils.java:57)
>       at 
> org.apache.flink.addons.hbase.util.HBaseReadWriteHelper.parseToRow(HBaseReadWriteHelper.java:158)
>       at 
> org.apache.flink.addons.hbase.HBaseLookupFunction.eval(HBaseLookupFunction.java:78)
>       at StreamExecCorrelate$144.processElement(Unknown Source)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>       at StreamExecCalc$134.processElement(Unknown Source)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>       at SourceConversion$127.processElement(Unknown Source)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>       at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
>       at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
> {code}
> trace code:
> {code:java}
> //代码占位符
> @Internal
> public class HBaseTypeUtils {
>  private static final byte[] EMPTY_BYTES = new byte[]{};
>  /**
>   * Deserialize byte array to Java Object with the given type.
>   */
>  public static Object deserializeToObject(byte[] value, int typeIdx, Charset 
> stringCharset) {
>   switch (typeIdx) {
>    case 0: // byte[]
>     return value;
>    case 1: // String
>     return new String(value, stringCharset);
> ...
> public String(byte bytes[], Charset charset) {
>     this(bytes, 0, bytes.length, charset);
> }{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to