[ 
https://issues.apache.org/jira/browse/BAHIR-303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chen updated BAHIR-303:
-----------------------
    Description: 
*when I use flink  write data (from kafka and hive) to kudu , sometime (not 
every row) happens some error like :*

java.lang.IllegalArgumentException: record_time cannot be set to null
    at org.apache.kudu.client.PartialRow.setNull(PartialRow.java:986)
    at org.apache.kudu.client.PartialRow.setNull(PartialRow.java:968)
    at org.apache.kudu.client.PartialRow.addObject(PartialRow.java:1077)
    at org.apache.kudu.client.PartialRow.addObject(PartialRow.java:1042)
    at 
org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper.createOperations(AbstractSingleOperationMapper.java:103)
    at 
org.apache.flink.connectors.kudu.connector.writer.KuduWriter.write(KuduWriter.java:97)

 

*Add code  to AbstractSingleOperationMapper class can fix it , I don't know why 
?  Please see this promble , thx .* 

The code like :

@Override
    public List<Operation> createOperations(T input, KuduTable table) {
        Optional<Operation> operationOpt = this.createBaseOperation(input, 
table);
        if (!operationOpt.isPresent())

{             return Collections.emptyList();         }

else {
            Operation operation = (Operation)operationOpt.get();
            PartialRow partialRow = operation.getRow();

            for(int i = 0; i < this.columnNames.length; ++i) {
                Object field = this.getField(input, i);
                if (field instanceof LazyBinaryFormat)

{                     field = ((LazyBinaryFormat)field).getJavaObject();        
         }

                if (field instanceof TimestampData)

{                     field = ((TimestampData)field).toTimestamp();             
    }

               {color:#ff0000} {color:#FF0000}*if (field == null && 
!partialRow.getSchema().getColumn(this.columnNames[i]).isNullable()){*{color}{color}


{color:#FF0000}                    {color:#ff8b00}+_*String tmp = 
input.toString();    // this code let field not null*_+ {color}{color}
{color:#FF0000}                  {color}
{color:#FF0000}                    *field = this.getField(input, i);*{color}

{color:#FF0000}                    *if (field instanceof LazyBinaryFormat) 
{*{color}
{color:#FF0000}                        *field = 
((LazyBinaryFormat)field).getJavaObject();*{color}
{color:#FF0000}                    *}*{color}

{color:#FF0000}                    *if (field instanceof TimestampData) 
{*{color}
{color:#FF0000}                        *field = 
((TimestampData)field).toTimestamp();*{color}
{color:#FF0000}                    *}*{color}
{color:#FF0000}                *}*{color}

                partialRow.addObject(this.columnNames[i], field);
            }

            return Collections.singletonList(operation);
        }
    }

_*Attachment include the full error msg .*_

  was:
*when I use flink  write data (from kafka and hive) to kudu , there happens 
some error like :*

java.lang.IllegalArgumentException: record_time cannot be set to null
    at org.apache.kudu.client.PartialRow.setNull(PartialRow.java:986)
    at org.apache.kudu.client.PartialRow.setNull(PartialRow.java:968)
    at org.apache.kudu.client.PartialRow.addObject(PartialRow.java:1077)
    at org.apache.kudu.client.PartialRow.addObject(PartialRow.java:1042)
    at 
org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper.createOperations(AbstractSingleOperationMapper.java:103)
    at 
org.apache.flink.connectors.kudu.connector.writer.KuduWriter.write(KuduWriter.java:97)

 

*Add code  to AbstractSingleOperationMapper class can fix it , I don't know why 
?  Please see this promble , thx .* 

The code like :

@Override
    public List<Operation> createOperations(T input, KuduTable table) {
        Optional<Operation> operationOpt = this.createBaseOperation(input, 
table);
        if (!operationOpt.isPresent())

{             return Collections.emptyList();         }

else {
            Operation operation = (Operation)operationOpt.get();
            PartialRow partialRow = operation.getRow();

            for(int i = 0; i < this.columnNames.length; ++i) {
                Object field = this.getField(input, i);
                if (field instanceof LazyBinaryFormat)

{                     field = ((LazyBinaryFormat)field).getJavaObject();        
         }

                if (field instanceof TimestampData)

{                     field = ((TimestampData)field).toTimestamp();             
    }

               {color:#ff0000} *if (field == null && 
!partialRow.getSchema().getColumn(this.columnNames[i]).isNullable()){*{color}
                    *{color:#ff0000}String tmp = input.toString();{color}*
                    **                    
                    *{color:#ff0000}field = this.getField(input, i);{color}*

                    *{color:#ff0000}if (field instanceof LazyBinaryFormat) 
{{color}*
                        *{color:#ff0000}field = 
((LazyBinaryFormat)field).getJavaObject();{color}*
                    *{color:#FF0000}}*

                    *if (field instanceof TimestampData) {*
                        *{color:#ff0000}field = 
((TimestampData)field).toTimestamp();{color}*
                    *{color:#FF0000}}*
                *}*

                partialRow.addObject(this.columnNames[i], field);
            }

            return Collections.singletonList(operation);
        }
    }

_*Attachment include the full error msg .*_


> AbstractSingleOperationMapper.createOperations method error with 
> IllegalArgumentException
> -----------------------------------------------------------------------------------------
>
>                 Key: BAHIR-303
>                 URL: https://issues.apache.org/jira/browse/BAHIR-303
>             Project: Bahir
>          Issue Type: Bug
>          Components: Flink Streaming Connectors
>    Affects Versions: Flink-1.0
>         Environment: centos7
> flink1.13
>            Reporter: chen
>            Priority: Critical
>             Fix For: Not Applicable
>
>         Attachments: error.log
>
>
> *when I use flink  write data (from kafka and hive) to kudu , sometime (not 
> every row) happens some error like :*
> java.lang.IllegalArgumentException: record_time cannot be set to null
>     at org.apache.kudu.client.PartialRow.setNull(PartialRow.java:986)
>     at org.apache.kudu.client.PartialRow.setNull(PartialRow.java:968)
>     at org.apache.kudu.client.PartialRow.addObject(PartialRow.java:1077)
>     at org.apache.kudu.client.PartialRow.addObject(PartialRow.java:1042)
>     at 
> org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper.createOperations(AbstractSingleOperationMapper.java:103)
>     at 
> org.apache.flink.connectors.kudu.connector.writer.KuduWriter.write(KuduWriter.java:97)
>  
> *Add code  to AbstractSingleOperationMapper class can fix it , I don't know 
> why ?  Please see this promble , thx .* 
> The code like :
> @Override
>     public List<Operation> createOperations(T input, KuduTable table) {
>         Optional<Operation> operationOpt = this.createBaseOperation(input, 
> table);
>         if (!operationOpt.isPresent())
> {             return Collections.emptyList();         }
> else {
>             Operation operation = (Operation)operationOpt.get();
>             PartialRow partialRow = operation.getRow();
>             for(int i = 0; i < this.columnNames.length; ++i) {
>                 Object field = this.getField(input, i);
>                 if (field instanceof LazyBinaryFormat)
> {                     field = ((LazyBinaryFormat)field).getJavaObject();      
>            }
>                 if (field instanceof TimestampData)
> {                     field = ((TimestampData)field).toTimestamp();           
>       }
>                {color:#ff0000} {color:#FF0000}*if (field == null && 
> !partialRow.getSchema().getColumn(this.columnNames[i]).isNullable()){*{color}{color}
> {color:#FF0000}                    {color:#ff8b00}+_*String tmp = 
> input.toString();    // this code let field not null*_+ {color}{color}
> {color:#FF0000}                  {color}
> {color:#FF0000}                    *field = this.getField(input, i);*{color}
> {color:#FF0000}                    *if (field instanceof LazyBinaryFormat) 
> {*{color}
> {color:#FF0000}                        *field = 
> ((LazyBinaryFormat)field).getJavaObject();*{color}
> {color:#FF0000}                    *}*{color}
> {color:#FF0000}                    *if (field instanceof TimestampData) 
> {*{color}
> {color:#FF0000}                        *field = 
> ((TimestampData)field).toTimestamp();*{color}
> {color:#FF0000}                    *}*{color}
> {color:#FF0000}                *}*{color}
>                 partialRow.addObject(this.columnNames[i], field);
>             }
>             return Collections.singletonList(operation);
>         }
>     }
> _*Attachment include the full error msg .*_



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to