wangxianghu commented on a change in pull request #2994: URL: https://github.com/apache/hudi/pull/2994#discussion_r643014504
########## File path: hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java ########## @@ -271,34 +272,39 @@ private void loadRecords(String partitionPath) throws Exception { final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks(); final int taskID = getRuntimeContext().getIndexOfThisSubtask(); + final long currentTimeMillis = System.currentTimeMillis(); for (HoodieBaseFile baseFile : latestBaseFiles) { + String commitTime = baseFile.getCommitTime(); + long commitTimeLong = instantDateFormat.parse(commitTime).getTime(); + // file was expired, needn't to fetch hoodieKeys + if (ttlTime > 0 && (commitTimeLong + ttlTime) < currentTimeMillis) { + continue; + } final List<HoodieKey> hoodieKeys; try { hoodieKeys = fileUtils.fetchRecordKeyPartitionPath(hadoopConf, new Path(baseFile.getPath())); } catch (Exception e) { // in case there was some empty parquet file when the pipeline - // crushes exceptionally. - LOG.error("Error when loading record keys from file: {}", baseFile); - continue; + // this exception is every important may cause duplication data , exception shouldn't be swallowed + LOG.error("Error when loading record keys from file: {}", baseFile, e); + throw e; } - hoodieKeys.forEach(hoodieKey -> { + hoodieKeys.stream().filter(hoodieKey -> recordKey.equals(hoodieKey.getRecordKey())).forEach(hoodieKey -> { Review comment: we are scanning all the files in the partition, can we stop this loop when we find the location of the recordKey -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org