simonqin commented on issue #1021: how can i deal this problem when partition's 
value changed with the same row_key? 
URL: https://github.com/apache/incubator-hudi/issues/1021#issuecomment-555870581
 
 
   I tested three versions, such as 0.4.7, 0.5.0, master,all of them have 
errors.
   here is my test code.This is modified according to the run method in 
HoodieClientExample.java of hudi-client.:
   ```)
   public void runTest() throws Exception {
   
       SparkConf sparkConf = new 
SparkConf().setAppName("hoodie-client-example");
       sparkConf.setMaster("local[1]");
       sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
       sparkConf.set("spark.kryoserializer.buffer.max", "512m");
       JavaSparkContext jsc = new JavaSparkContext(sparkConf);
   
       // Generator of some records to be loaded in.
       HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
   
       // initialize the table, if not done already
       Path path = new Path(tablePath);
       FileSystem fs = FSUtils.getFs(tablePath, jsc.hadoopConfiguration());
       if (!fs.exists(path)) {
         HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), 
tablePath, HoodieTableType.valueOf(tableType),
                 tableName, HoodieAvroPayload.class.getName());
       }
   
       // Create the write client to write some records in
       HoodieWriteConfig cfg = 
HoodieWriteConfig.newBuilder().withPath(tablePath)
               
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 
2).forTable(tableName)
               
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.GLOBAL_BLOOM).build())
               
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(11,
 12).build()).build();
       HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
   
       /**
        * Write 1 (only inserts)
        */
       String newCommitTime = client.startCommit();
       logger.info("Starting commit " + newCommitTime);
   //    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 
100);
       List<HoodieRecord> records = generateInserts(newCommitTime);
       JavaRDD<HoodieRecord> writeRecords = 
jsc.<HoodieRecord>parallelize(records, 1);
       client.upsert(writeRecords, newCommitTime);
   
       /**
        * Write 2 (updates)
        */
       newCommitTime = client.startCommit();
       logger.info("Starting commit " + newCommitTime);
   //    records.addAll(dataGen.generateUpdates(newCommitTime, 100));
       records.clear();
       records.addAll(generateUpdates(newCommitTime));
       writeRecords = jsc.<HoodieRecord>parallelize(records, 1);
       client.upsert(writeRecords, newCommitTime);
   
       /**
        * Schedule a compaction and also perform compaction on a MOR dataset
        */
       if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ) 
{
         Option<String> instant = client.scheduleCompaction(Option.empty());
         JavaRDD<WriteStatus> writeStatues = client.compact(instant.get());
         client.commitCompaction(instant.get(), writeStatues, Option.empty());
       }
     }
   
     public List<HoodieRecord> generateInserts(String commitTime) throws 
IOException {
       List<HoodieRecord> inserts = new ArrayList<>();
   
       String partitionPath = "2016/03/15";
       HoodieKey key = new HoodieKey("1", partitionPath);
       HoodieRecord record = new HoodieRecord(key, 
HoodieTestDataGenerator.generateRandomValue(key, commitTime));
       inserts.add(record);
   
       return inserts;
     }
   
     public List<HoodieRecord> generateUpdates(String commitTime) throws 
IOException {
       List<HoodieRecord> updates = new ArrayList<>();
   
       String partitionPath = "2016/04/15";
       HoodieKey key = new HoodieKey("1", partitionPath);
       HoodieRecord record = new HoodieRecord(key, 
HoodieTestDataGenerator.generateRandomValue(key, commitTime));
       updates.add(record);
   
       return updates;
     }
   
   error log:
   16214 [Executor task launch worker-0] INFO  
org.apache.hudi.common.table.view.AbstractTableFileSystemView  - Building file 
system view for partition (2016/04/15)
   16214 [Executor task launch worker-0] INFO  
org.apache.hudi.common.table.view.AbstractTableFileSystemView  - #files found 
in partition (2016/04/15) =0, Time taken =0
   16214 [Executor task launch worker-0] INFO  
org.apache.hudi.common.table.view.AbstractTableFileSystemView  - 
addFilesToView: NumFiles=0, FileGroupsCreationTime=0, StoreTimeTaken=0
   16214 [Executor task launch worker-0] INFO  
org.apache.hudi.common.table.view.HoodieTableFileSystemView  - Adding 
file-groups for partition :2016/04/15, #FileGroups=0
   16214 [Executor task launch worker-0] INFO  
org.apache.hudi.common.table.view.AbstractTableFileSystemView  - Time to load 
partition (2016/04/15) =0
   16214 [Executor task launch worker-0] ERROR 
org.apache.hudi.table.HoodieCopyOnWriteTable  - Error upserting bucketType 
UPDATE for partition :0
   java.util.NoSuchElementException: No value present in Option
        at org.apache.hudi.common.util.Option.get(Option.java:88)
        at 
org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:74)
        at 
org.apache.hudi.table.HoodieCopyOnWriteTable.getUpdateHandle(HoodieCopyOnWriteTable.java:220)
        at 
org.apache.hudi.table.HoodieCopyOnWriteTable.handleUpdate(HoodieCopyOnWriteTable.java:177)
        at 
org.apache.hudi.table.HoodieCopyOnWriteTable.handleUpsertPartition(HoodieCopyOnWriteTable.java:257)
        at 
org.apache.hudi.HoodieWriteClient.lambda$upsertRecordsInternal$507693af$1(HoodieWriteClient.java:428)
        at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
        at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
        at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:973)
        at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to