jiangbiao910 opened a new issue, #6260:
URL: https://github.com/apache/hudi/issues/6260

   Environment Description
   Hudi version : 0.11.0
   Flink version : 1.13.1
   Hive version : 2.1.1-cdh6.2.0
   Hadoop version : 3.0.0-cdh6.2.0
   Storage (HDFS/S3/GCS..) : HDFS
   Running on Docker? (yes/no) : no
   
   when I use **COW** , the error log :
   Caused by: java.util.**NoSuchElementException: No value present in Option**
        at org.apache.hudi.common.util.Option.get(Option.java:89) 
~[hudi-flink-bundle_2.11.jar:0.11.1]
        at 
org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:120) 
~[hudi-flink-bundle_2.11.jar:0.11.1]
        at org.apache.hudi.io.FlinkMergeHandle.<init>(FlinkMergeHandle.java:70) 
~[hudi-flink-bundle_2.11.jar:0.11.1]
        at 
org.apache.hudi.client.HoodieFlinkWriteClient.getOrCreateWriteHandle(HoodieFlinkWriteClient.java:491)
 ~[hudi-flink-bundle_2.11.jar:0.11.1]
        at 
org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:141)
 ~[hudi-flink-bundle_2.11.jar:0.11.1]
        at 
org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$1(StreamWriteFunction.java:184)
 ~[hudi-flink-bundle_2.11.jar:0.11.1]
        at 
org.apache.hudi.sink.StreamWriteFunction.lambda$flushRemaining$7(StreamWriteFunction.java:461)
 ~[hudi-flink-bundle_2.11.jar:0.11.1]
        at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) 
~[?:1.8.0_181]
        at 
org.apache.hudi.sink.StreamWriteFunction.flushRemaining(StreamWriteFunction.java:454)
 ~[hudi-flink-bundle_2.11.jar:0.11.1]
        at 
org.apache.hudi.sink.StreamWriteFunction.snapshotState(StreamWriteFunction.java:131)
 ~[hudi-flink-bundle_2.11.jar:0.11.1]
        at 
org.apache.hudi.sink.bucket.BucketStreamWriteFunction.snapshotState(BucketStreamWriteFunction.java:100)
 ~[hudi-flink-bundle_2.11.jar:0.11.1]
        at 
org.apache.hudi.sink.common.AbstractStreamWriteFunction.snapshotState(AbstractStreamWriteFunction.java:157)
 ~[hudi-flink-bundle_2.11.jar:0.11.1]
        at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:218)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1086)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1070)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1026)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:122)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:428)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
 ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
   
   `create table if not exists kafka_source
   (
       uuid                              string ,
       message_id                    string ,
       vin                           string ,
       ei                            string ,
       en                            string ,
       event_time                    string ,
       n                             string ,
       v                             string ,
      send_time                      string ,
     message_receive_time            string ,
     etlTime as PROCTIME()
   ) with (
             'connector' = 'kafka',
             'topic' = 'signal_detail_hudi_test',
             'properties.bootstrap.servers' = '', 
             'properties.group.id' = 'bigdata_es_test0728COW',
             'scan.startup.mode' = 'group-offsets',
             'format' = 'json',
             'json.fail-on-missing-field' = 'false',
             'json.ignore-parse-errors' = 'true',
             'json.map-null-key.mode' = 'DROP',
             'json.encode.decimal-as-plain-number' = 'true'
           )
   ;
   
   CREATE TABLE ods_hudi_sink
   (
      uuid                               string ,
       message_id                    string ,
       vin                           string ,
       ei                            string ,
       en                            string ,
       event_time                    string ,
       n                             string ,
       v                             string ,
      send_time                      string ,
     message_receive_time            string ,
     etl_update_ime                  string ,
       dt                            string ,
       hh                            string   
   )partitioned by (dt,hh)
       WITH (
           'connector' = 'hudi',
           'path' = 
'hdfs:///tmp/dev_zone_ods_es33_misc_signal_detail_hudi_test0728cow1',
           'table.type' = 'COPY_ON_WRITE',
           'changelog.enabled' = 'true',
           'write.precombine.field' = 'message_receive_time',
           'hoodie.datasource.write.recordkey.field' = 'uuid',
           'write.tasks' = '4',
           'compaction.tasks' = '4',
           'compaction.async.enabled' = 'true',
           'compaction.trigger.strategy' = 'num_commits',
          'compaction.delta_commits'='5',
          'compaction.delta_seconds'='180',
           'compaction.max_memory' = '1024',
           'changelog.enabled' = 'true',
           'read.streaming.enabled' = 'true',
           'read.streaming.check.interval' = '3',
           'hive_sync.enable' = 'true',
           'hive_sync.mode' = 'hms',
           'hive_sync.metastore.uris' = '',
           'hive_sync.db' = 'zone_test',
           'hive_sync.table' = 'hudi_test0728cow1',
           -- 'hive_sync.username'='hivetest',
           'hive_sync.support_timestamp' = 'true',
          'hoodie.cleaner.commits.retained' = '2',
         'index.type'='BUCKET',
         'hoodie.bucket.index.num.buckets'='3'
           );`
   
   the MOR type work normal, what can I do next?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to