[ 
https://issues.apache.org/jira/browse/BAHIR-303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chen updated BAHIR-303:
-----------------------
    Description: 
*when I use flink  write data (from kafka and hive) to kudu , there happens 
some error like :*

java.lang.IllegalArgumentException: record_time cannot be set to null
    at org.apache.kudu.client.PartialRow.setNull(PartialRow.java:986)
    at org.apache.kudu.client.PartialRow.setNull(PartialRow.java:968)
    at org.apache.kudu.client.PartialRow.addObject(PartialRow.java:1077)
    at org.apache.kudu.client.PartialRow.addObject(PartialRow.java:1042)
    at 
org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper.createOperations(AbstractSingleOperationMapper.java:103)
    at 
org.apache.flink.connectors.kudu.connector.writer.KuduWriter.write(KuduWriter.java:97)

 

*Add code  to AbstractSingleOperationMapper class can fix it , I don't know why 
?  Please see this promble , thx .* 

The code like :

@Override
    public List<Operation> createOperations(T input, KuduTable table) {
        Optional<Operation> operationOpt = this.createBaseOperation(input, 
table);
        if (!operationOpt.isPresent()) {
            return Collections.emptyList();
        } else {
            Operation operation = (Operation)operationOpt.get();
            PartialRow partialRow = operation.getRow();

            for(int i = 0; i < this.columnNames.length; ++i) {
                Object field = this.getField(input, i);
                if (field instanceof LazyBinaryFormat) {
                    field = ((LazyBinaryFormat)field).getJavaObject();
                }

                if (field instanceof TimestampData) {
                    field = ((TimestampData)field).toTimestamp();
                }

               {color:#FF0000} *if (field == null && 
!partialRow.getSchema().getColumn(this.columnNames[i]).isNullable()){*{color}
                    *{color:#FF0000}String tmp = input.toString();{color}*
                    *{color:#FF0000}{color}*                    
                    *{color:#FF0000}field = this.getField(input, i);{color}*

                    *{color:#FF0000}if (field instanceof LazyBinaryFormat) 
{{color}*
                        *{color:#FF0000}field = 
((LazyBinaryFormat)field).getJavaObject();{color}*
                    *{color:#FF0000}}{color}*

                    *{color:#FF0000}if (field instanceof TimestampData) 
{{color}*
                        *{color:#FF0000}field = 
((TimestampData)field).toTimestamp();{color}*
                    *{color:#FF0000}}{color}*
                *{color:#FF0000}}{color}*


                partialRow.addObject(this.columnNames[i], field);
            }

            return Collections.singletonList(operation);
        }
    }

Attachment include the full error msg .

  was:
*when I use flink  write data (from kafka and hive) to kudu , there happens 
some error like :*

java.lang.IllegalArgumentException: record_time cannot be set to null
    at org.apache.kudu.client.PartialRow.setNull(PartialRow.java:986)
    at org.apache.kudu.client.PartialRow.setNull(PartialRow.java:968)
    at org.apache.kudu.client.PartialRow.addObject(PartialRow.java:1077)
    at org.apache.kudu.client.PartialRow.addObject(PartialRow.java:1042)
    at 
org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper.createOperations(AbstractSingleOperationMapper.java:103)
    at 
org.apache.flink.connectors.kudu.connector.writer.KuduWriter.write(KuduWriter.java:97)

 

*Add code  to AbstractSingleOperationMapper class can fix it , I don't know why 
?  Please see this promble , thx .* 

The code like :

 if (field == null && 
!partialRow.getSchema().getColumn(this.columnNames[i]).isNullable()){
                    String tmp = input.toString();
                    
                    field = this.getField(input, i);

                    if (field instanceof LazyBinaryFormat) {
                        field = ((LazyBinaryFormat)field).getJavaObject();
                    }

                    if (field instanceof TimestampData) {
                        field = ((TimestampData)field).toTimestamp();
                    }
                }

 

Attachment include the full error msg .


> AbstractSingleOperationMapper.createOperations method error with 
> IllegalArgumentException
> -----------------------------------------------------------------------------------------
>
>                 Key: BAHIR-303
>                 URL: https://issues.apache.org/jira/browse/BAHIR-303
>             Project: Bahir
>          Issue Type: Bug
>          Components: Flink Streaming Connectors
>    Affects Versions: Flink-1.0
>         Environment: centos7
> flink1.13
>            Reporter: chen
>            Priority: Critical
>             Fix For: Not Applicable
>
>         Attachments: error.log
>
>
> *when I use flink  write data (from kafka and hive) to kudu , there happens 
> some error like :*
> java.lang.IllegalArgumentException: record_time cannot be set to null
>     at org.apache.kudu.client.PartialRow.setNull(PartialRow.java:986)
>     at org.apache.kudu.client.PartialRow.setNull(PartialRow.java:968)
>     at org.apache.kudu.client.PartialRow.addObject(PartialRow.java:1077)
>     at org.apache.kudu.client.PartialRow.addObject(PartialRow.java:1042)
>     at 
> org.apache.flink.connectors.kudu.connector.writer.AbstractSingleOperationMapper.createOperations(AbstractSingleOperationMapper.java:103)
>     at 
> org.apache.flink.connectors.kudu.connector.writer.KuduWriter.write(KuduWriter.java:97)
>  
> *Add code  to AbstractSingleOperationMapper class can fix it , I don't know 
> why ?  Please see this promble , thx .* 
> The code like :
> @Override
>     public List<Operation> createOperations(T input, KuduTable table) {
>         Optional<Operation> operationOpt = this.createBaseOperation(input, 
> table);
>         if (!operationOpt.isPresent()) {
>             return Collections.emptyList();
>         } else {
>             Operation operation = (Operation)operationOpt.get();
>             PartialRow partialRow = operation.getRow();
>             for(int i = 0; i < this.columnNames.length; ++i) {
>                 Object field = this.getField(input, i);
>                 if (field instanceof LazyBinaryFormat) {
>                     field = ((LazyBinaryFormat)field).getJavaObject();
>                 }
>                 if (field instanceof TimestampData) {
>                     field = ((TimestampData)field).toTimestamp();
>                 }
>                {color:#FF0000} *if (field == null && 
> !partialRow.getSchema().getColumn(this.columnNames[i]).isNullable()){*{color}
>                     *{color:#FF0000}String tmp = input.toString();{color}*
>                     *{color:#FF0000}{color}*                    
>                     *{color:#FF0000}field = this.getField(input, i);{color}*
>                     *{color:#FF0000}if (field instanceof LazyBinaryFormat) 
> {{color}*
>                         *{color:#FF0000}field = 
> ((LazyBinaryFormat)field).getJavaObject();{color}*
>                     *{color:#FF0000}}{color}*
>                     *{color:#FF0000}if (field instanceof TimestampData) 
> {{color}*
>                         *{color:#FF0000}field = 
> ((TimestampData)field).toTimestamp();{color}*
>                     *{color:#FF0000}}{color}*
>                 *{color:#FF0000}}{color}*
>                 partialRow.addObject(this.columnNames[i], field);
>             }
>             return Collections.singletonList(operation);
>         }
>     }
> Attachment include the full error msg .



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to